import json import logging import os import tempfile import time from collections import defaultdict from time import sleep import cv2 import requests from sqlalchemy import update, or_ from db import MysqlSession from db.mysql import BdYljg, BdYlks, ZxIeResult, ZxIeCost, ZxIeDischarge, ZxIeSettlement, ZxPhhd, ZxPhrec from photo_review import PATIENT_NAME, ADMISSION_DATE, DISCHARGE_DATE, MEDICAL_EXPENSES, PERSONAL_CASH_PAYMENT, \ PERSONAL_ACCOUNT_PAYMENT, PERSONAL_FUNDED_AMOUNT, MEDICAL_INSURANCE_TYPE, HOSPITAL, DEPARTMENT, DOCTOR, \ ADMISSION_ID, SETTLEMENT_ID, AGE, OCR, SETTLEMENT_IE, DISCHARGE_IE, COST_IE, PHHD_BATCH_SIZE, SLEEP_MINUTES, \ UPPERCASE_MEDICAL_EXPENSES, HOSTNAME, HOSPITAL_ALIAS from ucloud import ufile from util import image_util, util from util.data_util import handle_date, handle_decimal, parse_department, handle_name, \ handle_insurance_type, handle_original_data, handle_hospital, handle_department, handle_id, handle_age, parse_money, \ parse_hospital # 合并信息抽取结果 def merge_result(result1, result2): for key in result2: result1[key] = result1.get(key, []) + result2[key] return result1 def ie_temp_image(ie, ocr, image): with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: cv2.imwrite(temp_file.name, image) ie_result = [] try: layout = util.get_ocr_layout(ocr, temp_file.name) if not layout: # 无识别结果 ie_result = [] else: ie_result = ie({"doc": temp_file.name, "layout": layout})[0] except Exception as e: logging.error("信息抽取时出错", exc_info=e) finally: try: os.remove(temp_file.name) except Exception as e: logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e) return ie_result # 关键信息提取 def request_ie_result(task_enum, phrecs): url = task_enum.request_url() identity = int(time.time()) images = [] for phrec in phrecs: images.append({"name": phrec.cfjaddress, "pk": phrec.pk_phrec}) payload = {"images": images, "schema": task_enum.schema(), "pk_phhd": phrecs[0].pk_phhd, "identity": identity} response = requests.post(url, json=payload) if response.status_code == 200: return response.json()["data"] else: raise Exception(f"请求信息抽取结果失败,状态码:{response.status_code}") # 关键信息提取 def information_extraction(ie, phrecs, identity): result = {} for phrec in phrecs: img_path = ufile.get_private_url(phrec.cfjaddress) if not img_path: continue image = image_util.read(img_path) angles = image_util.parse_rotation_angles(image) angle_count = defaultdict(int, {"0": 0}) # 分割后图片的最优角度统计 zx_ie_results = [] split_results = image_util.split(image) for split_result in split_results: rotated_img = image_util.rotate(split_result["img"], int(angles[0])) ie_results = [{"result": ie_temp_image(ie, OCR, rotated_img), "angle": angles[0]}] if not ie_results[0]["result"] or len(ie_results[0]["result"]) < len(ie.kwargs.get("schema")): rotated_img = image_util.rotate(split_result["img"], int(angles[1])) ie_results.append({"result": ie_temp_image(ie, OCR, rotated_img), "angle": angles[1]}) now = util.get_default_datetime() best_angle = ["0", 0] for ie_result in ie_results: if not ie_result["result"]: continue result_json = json.dumps(ie_result["result"], ensure_ascii=False) if len(result_json) > 5000: result_json = result_json[:5000] zx_ie_results.append(ZxIeResult(pk_phhd=phrec.pk_phhd, pk_phrec=phrec.pk_phrec, id=identity, cfjaddress=phrec.cfjaddress, content=result_json, rotation_angle=int(ie_result["angle"]), x_offset=split_result["x_offset"], y_offset=split_result["y_offset"], create_time=now, creator=HOSTNAME, update_time=now, updater=HOSTNAME)) result = merge_result(result, ie_result["result"]) if len(ie_result["result"]) > best_angle[1]: best_angle = [ie_result["angle"], len(ie_result["result"])] angle_count[best_angle[0]] += 1 img_angle = max(angle_count, key=angle_count.get) if img_angle != "0": image = image_util.rotate(image, int(img_angle)) with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: cv2.imwrite(temp_file.name, image) try: ufile.upload_file(phrec.cfjaddress, temp_file.name) # 修正旋转角度 for zx_ie_result in zx_ie_results: zx_ie_result.rotation_angle -= int(img_angle) except Exception as e: logging.error(f"上传图片({phrec.cfjaddress})失败", exc_info=e) finally: util.delete_temp_file(temp_file.name) session = MysqlSession() session.add_all(zx_ie_results) session.commit() session.close() return result # 从keys中获取准确率最高的value def get_best_value_in_keys(source, keys): # 最终结果 result = None # 最大可能性 best_probability = 0 for key in keys: values = source.get(key) if values: for value in values: text = value.get("text") probability = value.get("probability") if text and probability > best_probability: result = text best_probability = probability return result # 从keys中获取所有value组成list def get_values_of_keys(source, keys): result = [] for key in keys: value = source.get(key) if value: for v in value: v = v.get("text") if v: result.append(v) # 去重 return list(set(result)) def save_or_update_ie(table, pk_phhd, data): data = {k: v for k, v in data.items() if v is not None and v != ""} obj = table(**data) session = MysqlSession() db_data = session.query(table).filter_by(pk_phhd=pk_phhd).one_or_none() now = util.get_default_datetime() if db_data: # 更新 db_data.update_time = now db_data.creator = HOSTNAME for k, v in data.items(): setattr(db_data, k, v) else: # 新增 obj.create_time = now obj.creator = HOSTNAME obj.update_time = now obj.updater = HOSTNAME session.add(obj) session.commit() session.close() def settlement_task(pk_phhd, settlement_list, identity): settlement_list_ie_result = information_extraction(SETTLEMENT_IE, settlement_list, identity) settlement_data = { "pk_phhd": pk_phhd, "name": handle_name(get_best_value_in_keys(settlement_list_ie_result, PATIENT_NAME)), "admission_date_str": handle_original_data(get_best_value_in_keys(settlement_list_ie_result, ADMISSION_DATE)), "discharge_date_str": handle_original_data(get_best_value_in_keys(settlement_list_ie_result, DISCHARGE_DATE)), "personal_cash_payment_str": handle_original_data( get_best_value_in_keys(settlement_list_ie_result, PERSONAL_CASH_PAYMENT)), "personal_account_payment_str": handle_original_data( get_best_value_in_keys(settlement_list_ie_result, PERSONAL_ACCOUNT_PAYMENT)), "personal_funded_amount_str": handle_original_data( get_best_value_in_keys(settlement_list_ie_result, PERSONAL_FUNDED_AMOUNT)), "medical_insurance_type_str": handle_original_data( get_best_value_in_keys(settlement_list_ie_result, MEDICAL_INSURANCE_TYPE)), "admission_id": handle_id(get_best_value_in_keys(settlement_list_ie_result, ADMISSION_ID)), "settlement_id": handle_id(get_best_value_in_keys(settlement_list_ie_result, SETTLEMENT_ID)), } settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"]) settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"]) settlement_data["discharge_date"] = handle_date(settlement_data["discharge_date_str"]) settlement_data["personal_cash_payment"] = handle_decimal(settlement_data["personal_cash_payment_str"]) settlement_data["personal_account_payment"] = handle_decimal(settlement_data["personal_account_payment_str"]) settlement_data["personal_funded_amount"] = handle_decimal(settlement_data["personal_funded_amount_str"]) settlement_data["medical_insurance_type"] = handle_insurance_type(settlement_data["medical_insurance_type_str"]) parse_money_result = parse_money(get_best_value_in_keys(settlement_list_ie_result, UPPERCASE_MEDICAL_EXPENSES), get_best_value_in_keys(settlement_list_ie_result, MEDICAL_EXPENSES)) settlement_data["medical_expenses_str"] = handle_original_data(parse_money_result[0]) settlement_data["medical_expenses"] = parse_money_result[1] save_or_update_ie(ZxIeSettlement, pk_phhd, settlement_data) def discharge_task(pk_phhd, discharge_record, identity): discharge_record_ie_result = information_extraction(DISCHARGE_IE, discharge_record, identity) hospitals = get_values_of_keys(discharge_record_ie_result, HOSPITAL) departments = get_values_of_keys(discharge_record_ie_result, DEPARTMENT) discharge_data = { "pk_phhd": pk_phhd, "hospital": handle_hospital(",".join(hospitals)), "department": handle_department(",".join(departments)), "name": handle_name(get_best_value_in_keys(discharge_record_ie_result, PATIENT_NAME)), "admission_date_str": handle_original_data(get_best_value_in_keys(discharge_record_ie_result, ADMISSION_DATE)), "discharge_date_str": handle_original_data(get_best_value_in_keys(discharge_record_ie_result, DISCHARGE_DATE)), "doctor": handle_name(get_best_value_in_keys(discharge_record_ie_result, DOCTOR)), "admission_id": handle_id(get_best_value_in_keys(discharge_record_ie_result, ADMISSION_ID)), "age": handle_age(get_best_value_in_keys(discharge_record_ie_result, AGE)), } discharge_data["admission_date"] = handle_date(discharge_data["admission_date_str"]) discharge_data["discharge_date"] = handle_date(discharge_data["discharge_date_str"]) if hospitals: hospital_like_conditions = [] for hospital in hospitals: parsed_hospitals = parse_hospital(hospital) for parsed_hospital in parsed_hospitals: if parsed_hospital in HOSPITAL_ALIAS: for hospital_alias in HOSPITAL_ALIAS[parsed_hospital]: hospital_like_conditions.append(BdYljg.name.like(f'%{hospital_alias}%')) else: hospital_like_conditions.append(BdYljg.name.like(f'%{parsed_hospital}%')) session = MysqlSession() yljg = session.query(BdYljg.pk_yljg, BdYljg.name).filter(or_(*hospital_like_conditions)).limit(1).one_or_none() session.close() if yljg: discharge_data["pk_yljg"] = yljg.pk_yljg if departments: department_values = [] for dept in departments: department_values += parse_department(dept) department_values = list(set(department_values)) if department_values: session = MysqlSession() ylks = session.query(BdYlks.pk_ylks, BdYlks.name) \ .filter(BdYlks.name.in_(department_values)).limit(1).one_or_none() session.close() if ylks: discharge_data["pk_ylks"] = ylks.pk_ylks save_or_update_ie(ZxIeDischarge, pk_phhd, discharge_data) def cost_task(pk_phhd, cost_list, identity): cost_list_ie_result = information_extraction(COST_IE, cost_list, identity) cost_data = { "pk_phhd": pk_phhd, "name": handle_name(get_best_value_in_keys(cost_list_ie_result, PATIENT_NAME)), "admission_date_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, ADMISSION_DATE)), "discharge_date_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, DISCHARGE_DATE)), "medical_expenses_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, MEDICAL_EXPENSES)) } cost_data["admission_date"] = handle_date(cost_data["admission_date_str"]) cost_data["discharge_date"] = handle_date(cost_data["discharge_date_str"]) cost_data["medical_expenses"] = handle_decimal(cost_data["medical_expenses_str"]) save_or_update_ie(ZxIeCost, pk_phhd, cost_data) def photo_review(pk_phhd): settlement_list = [] discharge_record = [] cost_list = [] session = MysqlSession() phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.pk_phhd, ZxPhrec.cRectype, ZxPhrec.cfjaddress).filter( ZxPhrec.pk_phhd == pk_phhd ).all() session.close() for phrec in phrecs: if phrec.cRectype == "1": settlement_list.append(phrec) elif phrec.cRectype == "3": discharge_record.append(phrec) elif phrec.cRectype == "4": cost_list.append(phrec) # 同一批图的标识 identity = int(time.time()) settlement_task(pk_phhd, settlement_list, identity) discharge_task(pk_phhd, discharge_record, identity) cost_task(pk_phhd, cost_list, identity) def main(): while 1: session = MysqlSession() phhds = (session.query(ZxPhhd.pk_phhd) .join(ZxPhrec, ZxPhhd.pk_phhd == ZxPhrec.pk_phhd, isouter=True) .filter(ZxPhhd.exsuccess_flag == "1") .filter(ZxPhrec.pk_phrec.isnot(None)) .distinct().limit(PHHD_BATCH_SIZE).all()) # 将状态改为正在识别中 pk_phhd_values = [phhd.pk_phhd for phhd in phhds] update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(exsuccess_flag="2")) session.execute(update_flag) session.commit() session.close() if phhds: for phhd in phhds: pk_phhd = phhd.pk_phhd logging.info(f"开始识别:{pk_phhd}") start_time = time.time() photo_review(pk_phhd) # 识别完成更新标识 session = MysqlSession() update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values( exsuccess_flag="8", ref_id1=HOSTNAME, checktime=util.get_default_datetime(), fFSYLFY=time.time() - start_time)) session.execute(update_flag) session.commit() session.close() else: # 没有查询到新案子,等待一段时间后再查 logging.info(f"暂未查询到需要识别的案子,等待{SLEEP_MINUTES}分钟...") sleep(SLEEP_MINUTES * 60)