import json import logging import os import shutil import time import uuid from collections import defaultdict from time import sleep import cv2 import fitz import jieba import numpy as np import zxingcpp from rapidfuzz import process, fuzz from sqlalchemy import update from db import MysqlSession from db.mysql import BdYljg, BdYlks, ZxIeCost, ZxIeDischarge, ZxIeSettlement, ZxPhhd, ZxPhrec, ZxIeReview, ZxIeResult from log import HOSTNAME from photo_review import PHHD_BATCH_SIZE, SLEEP_MINUTES, HOSPITAL_ALIAS, HOSPITAL_FILTER, DEPARTMENT_ALIAS, \ DEPARTMENT_FILTER, DISCHARGE_KEY, modify_batch_id, BATCH_ID from services.paddle_services import IE_KEY from ucloud import ufile from util import image_util, common_util, html_util, model_util from util.data_util import handle_date, handle_decimal, parse_department, handle_name, handle_insurance_type, \ handle_original_data, handle_hospital, handle_department, handle_id, handle_age, parse_money, parse_hospital, \ parse_page_num, handle_tiny_int # 尝试从二维码中获取高清图片 def get_better_image_from_qrcode(img_path, image_id, dpi=150): def _parse_pdf_url(pdf_url_to_parse): pdf_file = None local_pdf_path = None try: local_pdf_path = html_util.download_pdf(pdf_url_to_parse) # 打开PDF文件 pdf_file = fitz.open(local_pdf_path) # 选择第一页 page = pdf_file[0] # 定义缩放系数(DPI) default_dpi = 72 zoom = dpi / default_dpi # 设置矩阵变换参数 mat = fitz.Matrix(zoom, zoom) # 渲染页面 pix = page.get_pixmap(matrix=mat) # 将渲染结果转换为OpenCV兼容的格式 img = np.frombuffer(pix.samples, dtype=np.uint8).reshape((pix.height, pix.width, -1)) img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) img_name, img_ext = common_util.parse_save_path(img_path) better_img_path = common_util.get_processed_img_path(f'{img_name}.better.{img_ext}') cv2.imwrite(better_img_path, img) return better_img_path, page.get_text() except Exception as ex: logging.getLogger('error').error('解析pdf失败!', exc_info=ex) return None, None finally: if pdf_file: pdf_file.close() if local_pdf_path: common_util.delete_temp_file(local_pdf_path) jsczt_base_url = 'http://einvoice.jsczt.cn' try: img = cv2.imread(img_path) results = zxingcpp.read_barcodes(img, text_mode=zxingcpp.TextMode.HRI) except Exception as e: logging.getLogger('error').info('二维码识别失败', exc_info=e) results = [] for result in results: try: url = result.text if url.startswith(jsczt_base_url): id_base = html_util.get_jsczt_id_base(url) if not id_base: continue pdf_url = f'{jsczt_base_url}/download?idBase={id_base}' return _parse_pdf_url(pdf_url) elif '/yldzpjqr/invoice/query/issueinfo' in url: # 无锡医院 pdf_url = html_util.get_wx_pdf_url(url) if not pdf_url: continue return _parse_pdf_url(pdf_url) elif '/dzfp/tz3y' in url: # 泰州市第三人民医院 pdf_url = html_util.get_tz3y_pdf_url(url) if not pdf_url: continue return _parse_pdf_url(pdf_url) elif url.startswith('http://weixin.qq.com'): # 无效地址 continue else: logging.getLogger('qr').info(f'[{image_id}]中有未知二维码内容:{url}') except Exception as e: logging.getLogger('error').error('从二维码中获取高清图片时出错', exc_info=e) continue return None, None # 关键信息提取 def information_extraction(phrec, pk_phhd): """ 处理单张图片 :param phrec:图片信息 :param pk_phhd:案子主键 :return:记录类型,信息抽取结果 """ original_img_path = common_util.get_img_path(phrec.cfjaddress) if not original_img_path: img_url = ufile.get_private_url(phrec.cfjaddress) if not img_url: return None, None, None original_img_path = common_util.save_to_local(img_url) img_path = common_util.get_processed_img_path(phrec.cfjaddress) shutil.copy2(original_img_path, img_path) # 尝试从二维码中获取高清图片 better_img_path, text = get_better_image_from_qrcode(img_path, phrec.cfjaddress) if phrec.cRectype != '1': better_img_path = None # 非结算单暂时不进行替换 if better_img_path is not None: rec_type = '基本医保结算单' if text: info_extract = model_util.ie_settlement_text(text) else: info_extract = model_util.ie_settlement( better_img_path, common_util.ocr_result_to_layout(model_util.ocr(better_img_path)) ) ocr_text = None # 此处肯定不是出院记录,后续用不到 else: if image_util.is_photo(img_path): book_img_path = model_util.det_book(img_path) # 识别文档区域并裁剪 dewarped_img_path = model_util.dewarp(book_img_path) # 去扭曲 else: # todo:也可能是图片,后续添加细分逻辑 dewarped_img_path = img_path angles = model_util.clas_orientation(dewarped_img_path) ocr_result = [] rotated_img = None for angle in angles: tmp_ocr_result = [] tmp_rotated_img = image_util.rotate(dewarped_img_path, int(angle)) split_results = image_util.split(tmp_rotated_img) for split_result in split_results: if split_result['img'] is None: continue a4_img = image_util.expand_to_a4_size(split_result['img']) tmp_ocr_result += model_util.ocr(a4_img) if len(tmp_ocr_result) > len(ocr_result): ocr_result = tmp_ocr_result rotated_img = tmp_rotated_img ocr_text = common_util.ocr_result_to_text(ocr_result) rec_type = model_util.clas_text(ocr_text) if ocr_text else None if rec_type == '基本医保结算单': info_extract = model_util.ie_settlement(rotated_img, common_util.ocr_result_to_layout(ocr_result)) elif rec_type == '出院记录': info_extract = model_util.ie_discharge(rotated_img, common_util.ocr_result_to_layout(ocr_result)) elif rec_type == '费用清单': info_extract = model_util.ie_cost(rotated_img, common_util.ocr_result_to_layout(ocr_result)) else: info_extract = None if info_extract: result_json = json.dumps(info_extract, ensure_ascii=False) if len(result_json) > 5000: result_json = result_json[:5000] now = common_util.get_default_datetime() session = MysqlSession() session.add(ZxIeResult(pk_phhd=pk_phhd, pk_phrec=phrec.pk_phrec, id=BATCH_ID, cfjaddress=phrec.cfjaddress, content=result_json, create_time=now, creator=HOSTNAME, update_time=now, updater=HOSTNAME)) session.commit() session.close() return rec_type, info_extract, ocr_text # 从keys中获取准确率最高的value def get_best_value_of_key(source, key): # 最终结果 result = None # 最大可能性 best_probability = 0 values = source.get(key) if values: for value in values: for v in value: text = v.get("text") probability = v.get("probability") if text and probability > best_probability: result = text best_probability = probability return result # 从keys中获取所有value组成list def get_values_of_key(source, key): result = [] values = source.get(key) if values: for value in values: for v in value: v = v.get("text") if v: result.append(v) # 去重 return list(set(result)) def save_or_update_ie(table, pk_phhd, data): data = {k: v for k, v in data.items() if v is not None and v != ""} obj = table(**data) session = MysqlSession() db_data = session.query(table).filter_by(pk_phhd=pk_phhd).one_or_none() now = common_util.get_default_datetime() if db_data: # 更新 db_data.update_time = now db_data.creator = HOSTNAME for k, v in data.items(): setattr(db_data, k, v) else: # 新增 obj.create_time = now obj.creator = HOSTNAME obj.update_time = now obj.updater = HOSTNAME session.add(obj) session.commit() session.close() def search_hospital(hospital): def _filter_search_keywords(keywords): keywords = [x for x in keywords if x not in HOSPITAL_FILTER and len(x) > 1] result1 = "" result2 = "" for keyword in keywords: if "医院" in keyword: break result2 = result1 result1 = keyword result = [result1] if result2: result.append(result2) return result cut_list = jieba.lcut(hospital, HMM=False) session = MysqlSession() yljg = session.query(BdYljg.pk_yljg, BdYljg.name).filter(BdYljg.name.like(f"%{'%'.join(cut_list)}%")).all() if not yljg: filter_keywords = _filter_search_keywords(cut_list) for filter_keyword in filter_keywords: yljg = session.query(BdYljg.pk_yljg, BdYljg.name).filter(BdYljg.name.like(f"%{filter_keyword}%")).all() if yljg: break session.close() yljg = {row.pk_yljg: row.name for row in yljg} best_match = process.extractOne(hospital, yljg, scorer=fuzz.partial_token_set_ratio) return best_match def search_department(department): def _filter_search_keywords(keywords): keywords = [x for x in keywords if x not in DEPARTMENT_FILTER] return keywords cut_list = jieba.lcut(department, HMM=False) session = MysqlSession() cut_list = _filter_search_keywords(cut_list) if not cut_list: return None ylks = session.query(BdYlks.pk_ylks, BdYlks.name).filter(BdYlks.name.like(f"%{'%'.join(cut_list)}%")).all() if not ylks: filter_keywords = cut_list for filter_keyword in filter_keywords: ylks = session.query(BdYlks.pk_ylks, BdYlks.name).filter(BdYlks.name.like(f"%{filter_keyword}%")).all() if ylks: break session.close() ylks = {row.pk_ylks: row.name for row in ylks} best_match = process.extractOne(department, ylks, scorer=fuzz.token_ratio) if best_match and best_match[0] in ["内科", "外科"]: # 降低内科、外科的优先级 best_match = list(best_match) best_match[1] -= 100 return best_match def settlement_task(pk_phhd, settlement_list_ie_result): settlement_data = { "pk_phhd": pk_phhd, "name": handle_name(get_best_value_of_key(settlement_list_ie_result, IE_KEY['name'])), "admission_date_str": handle_original_data( get_best_value_of_key(settlement_list_ie_result, IE_KEY['admission_date'])), "discharge_date_str": handle_original_data( get_best_value_of_key(settlement_list_ie_result, IE_KEY['discharge_date'])), "personal_cash_payment_str": handle_original_data( get_best_value_of_key(settlement_list_ie_result, IE_KEY['personal_cash_payment'])), "personal_account_payment_str": handle_original_data( get_best_value_of_key(settlement_list_ie_result, IE_KEY['personal_account_payment'])), "personal_funded_amount_str": handle_original_data( get_best_value_of_key(settlement_list_ie_result, IE_KEY['personal_funded_amount'])), "medical_insurance_type_str": handle_original_data( get_best_value_of_key(settlement_list_ie_result, IE_KEY['medical_insurance_type'])), "admission_id": handle_id(get_best_value_of_key(settlement_list_ie_result, IE_KEY['admission_id'])), "settlement_id": handle_id(get_best_value_of_key(settlement_list_ie_result, IE_KEY['settlement_id'])), } settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"]) settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"]) settlement_data["discharge_date"] = handle_date(settlement_data["discharge_date_str"]) settlement_data["personal_cash_payment"] = handle_decimal(settlement_data["personal_cash_payment_str"]) settlement_data["personal_account_payment"] = handle_decimal(settlement_data["personal_account_payment_str"]) settlement_data["personal_funded_amount"] = handle_decimal(settlement_data["personal_funded_amount_str"]) settlement_data["medical_insurance_type"] = handle_insurance_type(settlement_data["medical_insurance_type_str"]) parse_money_result = parse_money( get_best_value_of_key(settlement_list_ie_result, IE_KEY['uppercase_medical_expenses']), get_best_value_of_key(settlement_list_ie_result, IE_KEY['medical_expenses'])) settlement_data["medical_expenses_str"] = handle_original_data(parse_money_result[0]) settlement_data["medical_expenses"] = parse_money_result[1] save_or_update_ie(ZxIeSettlement, pk_phhd, settlement_data) return settlement_data def discharge_task(pk_phhd, discharge_record_ie_result): hospitals = get_values_of_key(discharge_record_ie_result, IE_KEY['hospital']) departments = get_values_of_key(discharge_record_ie_result, IE_KEY['department']) discharge_data = { "pk_phhd": pk_phhd, "hospital": handle_hospital(",".join(hospitals)), "department": handle_department(",".join(departments)), "name": handle_name(get_best_value_of_key(discharge_record_ie_result, IE_KEY['name'])), "admission_date_str": handle_original_data( get_best_value_of_key(discharge_record_ie_result, IE_KEY['admission_date'])), "discharge_date_str": handle_original_data( get_best_value_of_key(discharge_record_ie_result, IE_KEY['discharge_date'])), "doctor": handle_name(get_best_value_of_key(discharge_record_ie_result, IE_KEY['doctor'])), "admission_id": handle_id(get_best_value_of_key(discharge_record_ie_result, IE_KEY['admission_id'])), "age": handle_age(get_best_value_of_key(discharge_record_ie_result, IE_KEY['age'])), } discharge_data["admission_date"] = handle_date(discharge_data["admission_date_str"]) discharge_data["discharge_date"] = handle_date(discharge_data["discharge_date_str"]) if hospitals: match_hospitals = [] for hospital in hospitals: parsed_hospitals = parse_hospital(hospital) for parsed_hospital in parsed_hospitals: search_result = search_hospital(parsed_hospital) match_hospitals.append(search_result) if search_result and search_result[1] == 100: break for hospital_alias_key in HOSPITAL_ALIAS.keys(): if hospital_alias_key in parsed_hospital: for hospital_alias in HOSPITAL_ALIAS[hospital_alias_key]: new_hospital = parsed_hospital.replace(hospital_alias_key, hospital_alias) match_hospitals.append(search_hospital(new_hospital)) break best_match = None best_score = 0 for match_hospital in match_hospitals: if match_hospital and match_hospital[1] > best_score: best_match = match_hospital best_score = match_hospital[1] if best_score == 100: break if best_match: discharge_data["pk_yljg"] = best_match[2] if departments: match_departments = [] for department in departments: parsed_departments = parse_department(department) for parsed_department in parsed_departments: search_result = search_department(parsed_department) match_departments.append(search_result) if search_result and search_result[1] == 100: break for department_alias_key in DEPARTMENT_ALIAS.keys(): if department_alias_key in parsed_department: for department_alias in DEPARTMENT_ALIAS[department_alias_key]: new_department = parsed_department.replace(department_alias_key, department_alias) match_departments.append(search_department(new_department)) break best_match = None best_score = -1000 for match_department in match_departments: if match_department and match_department[1] > best_score: best_match = match_department best_score = match_department[1] if best_score == 100: break if best_match: discharge_data["pk_ylks"] = best_match[2] save_or_update_ie(ZxIeDischarge, pk_phhd, discharge_data) return discharge_data def cost_task(pk_phhd, cost_list_ie_result): cost_data = { "pk_phhd": pk_phhd, "name": handle_name(get_best_value_of_key(cost_list_ie_result, IE_KEY['name'])), "admission_date_str": handle_original_data( get_best_value_of_key(cost_list_ie_result, IE_KEY['admission_date'])), "discharge_date_str": handle_original_data( get_best_value_of_key(cost_list_ie_result, IE_KEY['discharge_date'])), "medical_expenses_str": handle_original_data( get_best_value_of_key(cost_list_ie_result, IE_KEY['medical_expenses'])) } cost_data["admission_date"] = handle_date(cost_data["admission_date_str"]) cost_data["discharge_date"] = handle_date(cost_data["discharge_date_str"]) cost_data["medical_expenses"] = handle_decimal(cost_data["medical_expenses_str"]) if cost_list_ie_result.get(IE_KEY['page']): page_nums, page_count = parse_page_num(cost_list_ie_result[IE_KEY['page']]) cost_data['page_nums'] = handle_original_data(','.join(page_nums)) cost_data['page_count'] = handle_tiny_int(page_count) save_or_update_ie(ZxIeCost, pk_phhd, cost_data) return cost_data def photo_review(pk_phhd, name): """ 处理单个报销案子 :param pk_phhd: 报销单主键 :param name: 报销人姓名 """ settlement_result = defaultdict(list) discharge_result = defaultdict(list) cost_result = defaultdict(list) session = MysqlSession() phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.cRectype, ZxPhrec.cfjaddress).filter( ZxPhrec.pk_phhd == pk_phhd ).all() session.close() # 同一批图的标识 modify_batch_id(uuid.uuid4().hex) discharge_text = '' for phrec in phrecs: processed_img_dir = common_util.get_processed_img_path('') os.makedirs(processed_img_dir, exist_ok=True) rec_type, ie_result, ocr_text = information_extraction(phrec, pk_phhd) if rec_type == '基本医保结算单': rec_result = settlement_result elif rec_type == '出院记录': rec_result = discharge_result discharge_text += ocr_text elif rec_type == '费用清单': rec_result = cost_result else: rec_result = None if rec_result is not None: for key, value in ie_result.items(): rec_result[key].append(value) # 删除多余图片 if os.path.exists(processed_img_dir) and os.path.isdir(processed_img_dir): shutil.rmtree(processed_img_dir) settlement_data = settlement_task(pk_phhd, settlement_result) discharge_data = discharge_task(pk_phhd, discharge_result) cost_data = cost_task(pk_phhd, cost_result) # 三项资料完整性判断 # 三项资料缺项判断 review_result = { 'pk_phhd': pk_phhd, 'has_settlement': bool(settlement_result), 'has_discharge': bool(discharge_result), 'has_cost': bool(cost_result), } if (review_result['has_settlement'] and settlement_data.get('personal_account_payment') and settlement_data.get('personal_cash_payment') and settlement_data.get('medical_expenses')): review_result['has_settlement'] &= ( float(settlement_data['personal_account_payment']) + float(settlement_data['personal_cash_payment']) < float(settlement_data['medical_expenses']) ) # 三项资料缺页判断 page_description = [] if review_result['has_discharge']: for discharge_item in DISCHARGE_KEY: if not any(key in discharge_text for key in DISCHARGE_KEY[discharge_item]): page_description.append(f"《出院记录》缺页") break if review_result['has_cost']: cost_missing_page = {} if cost_data.get('page_nums'): page_nums = cost_data['page_nums'].split(',') required_set = set(range(1, cost_data['page_count'] + 1)) page_set = set(page_nums) cost_missing_page = required_set - page_set if cost_missing_page: page_description.append(f"《住院费用清单》,缺第{','.join(cost_missing_page)}页") if page_description: review_result['full_page'] = False review_result['page_description'] = ';'.join(page_description) else: review_result['full_page'] = True review_result['integrity'] = (review_result['has_settlement'] and review_result['has_discharge'] and review_result['has_cost'] and review_result['full_page']) # 三项资料一致性判断 # 姓名一致性 name_list = [settlement_data['name'], discharge_data['name'], cost_data['name']] if sum(not bool(n) for n in name_list) > 1: # 有2个及以上空值直接认为都不一致 review_result['name_match'] = '0' else: unique_name = set(name_list) if len(unique_name) == 1: review_result['name_match'] = '1' if name == unique_name.pop() else '5' elif len(unique_name) == 2: if settlement_data['name'] != discharge_data['name'] and settlement_data['name'] != cost_data['name']: review_result['name_match'] = '2' elif discharge_data['name'] != settlement_data['name'] and discharge_data['name'] != cost_data['name']: review_result['name_match'] = '3' else: review_result['name_match'] = '4' else: review_result['name_match'] = '0' # 住院日期一致性 if (settlement_data['admission_date'] and discharge_data['admission_date'] and settlement_data['discharge_date'] and discharge_data['discharge_date'] and settlement_data['admission_date'] == discharge_data['admission_date'] and settlement_data['discharge_date'] == discharge_data['discharge_date']): review_result['admission_date_match'] = '1' else: review_result['admission_date_match'] = '0' # 出院日期一致性 discharge_date_list = [settlement_data['discharge_date'], discharge_data['discharge_date'], cost_data['discharge_date']] if sum(not bool(d) for d in discharge_date_list) > 1: review_result['discharge_date_match'] = '0' else: unique_discharge_date = set(discharge_date_list) if len(unique_discharge_date) == 1: review_result['discharge_date_match'] = '1' elif len(unique_discharge_date) == 2: if (settlement_data['discharge_date'] != discharge_data['discharge_date'] and settlement_data['discharge_date'] != cost_data['discharge_date']): review_result['discharge_date_match'] = '2' elif (discharge_data['discharge_date'] != settlement_data['discharge_date'] and discharge_data['discharge_date'] != cost_data['discharge_date']): review_result['discharge_date_match'] = '3' else: review_result['discharge_date_match'] = '4' else: review_result['discharge_date_match'] = '0' review_result['consistency'] = (review_result['name_match'] == '1' and review_result['admission_date_match'] == '1' and review_result['discharge_date_match'] == '1') review_result['success'] = review_result['integrity'] and review_result['consistency'] save_or_update_ie(ZxIeReview, pk_phhd, review_result) def main(): """ 照片审核批量控制 """ while 1: session = MysqlSession() phhds = (session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm) .join(ZxPhrec, ZxPhhd.pk_phhd == ZxPhrec.pk_phhd, isouter=True) .filter(ZxPhhd.exsuccess_flag == "1") .filter(ZxPhrec.pk_phrec.isnot(None)) .order_by(ZxPhhd.priority_num.desc()) .distinct().limit(PHHD_BATCH_SIZE).all()) # 将状态改为正在识别中 pk_phhd_values = [phhd.pk_phhd for phhd in phhds] update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(exsuccess_flag="2")) session.execute(update_flag) session.commit() session.close() if phhds: for phhd in phhds: pk_phhd = phhd.pk_phhd logging.info(f"开始识别:{pk_phhd}") start_time = time.time() photo_review(pk_phhd, phhd.cXm) # 识别完成更新标识 session = MysqlSession() update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values( exsuccess_flag="8", ref_id1=HOSTNAME, checktime=common_util.get_default_datetime(), fFSYLFY=time.time() - start_time)) session.execute(update_flag) session.commit() session.close() else: # 没有查询到新案子,等待一段时间后再查 logging.info(f"暂未查询到需要识别的案子,等待{SLEEP_MINUTES}分钟...") sleep(SLEEP_MINUTES * 60)