import json import logging import os import sys from time import sleep import paddle from sqlalchemy import update from config.keys import PATIENT_NAME, ADMISSION_DATE, DISCHARGE_DATE, MEDICAL_EXPENSES, PERSONAL_CASH_PAYMENT, \ PERSONAL_ACCOUNT_PAYMENT, PERSONAL_FUNDED_AMOUNT, MEDICAL_INSURANCE_TYPE, HOSPITAL, DEPARTMENT, DOCTOR from config.mysql import MysqlSession from config.photo_review import PHHD_BATCH_SIZE, SLEEP_MINUTES from photo_review import settlement_ie, discharge_ie, cost_ie from photo_review.entity.bd_yljg import BdYljg from photo_review.entity.bd_ylks import BdYlks from photo_review.entity.zx_ie_cost import ZxIeCost from photo_review.entity.zx_ie_discharge import ZxIeDischarge from photo_review.entity.zx_ie_settlement import ZxIeSettlement from photo_review.entity.zx_ocr import ZxOcr from photo_review.entity.zx_phhd import ZxPhhd from photo_review.entity.zx_phrec import ZxPhrec from photo_review.util.data_util import handle_date, handle_decimal, handle_department, handle_name, \ handle_insurance_type, handle_original_data from photo_review.util.util import get_default_datetime from ucloud import ucloud sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 关键信息提取 def information_extraction(ie, phrecs): result = {} docs = [] doc_phrecs = [] for phrec in phrecs: pic_path = ucloud.get_private_url(phrec.cfjaddress) if pic_path: docs.append({"doc": pic_path}) doc_phrecs.append(phrec) if not docs: return result ie_results = ie(docs) now = get_default_datetime() for i in range(len(ie_results)): ie_result = ie_results[i] phrec = doc_phrecs[i] result_json = json.dumps(ie_result, ensure_ascii=False) if len(result_json) > 5000: result_json = result_json[:5000] session = MysqlSession() zx_ocr = ZxOcr(pk_phhd=phrec.pk_phhd, pk_phrec=phrec.pk_phrec, cfjaddress=phrec.cfjaddress, content=result_json, create_time=now, update_time=now) session.add(zx_ocr) session.commit() session.close() result.update(ie_result) return result # 从keys中获取准确率最高的value def get_best_value_in_keys(source, keys): # 最终结果 result = None # 最大可能性 most_probability = 0 for key in keys: values = source.get(key) if values: for value in values: text = value.get("text") probability = value.get("probability") if text and probability > most_probability: result = text return result # 从keys中获取所有value组成list def get_values_of_keys(source, keys): result = [] for key in keys: value = source.get(key) if value: value = value[0].get("text") if value: result.append(value) return result def save_or_update_ie(table, pk_phhd, data): data = {k: v for k, v in data.items() if v is not None and v != ""} obj = table(**data) session = MysqlSession() db_data = session.query(table).filter_by(pk_phhd=pk_phhd).one_or_none() now = get_default_datetime() if db_data: # 更新 db_data.update_time = now for k, v in data.items(): setattr(db_data, k, v) else: # 新增 obj.create_time = now obj.update_time = now session.add(obj) session.commit() session.close() def photo_review(pk_phhd): settlement_list = [] discharge_record = [] cost_list = [] session = MysqlSession() phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.pk_phhd, ZxPhrec.cRectype, ZxPhrec.cfjaddress) \ .filter(ZxPhrec.pk_phhd == pk_phhd) \ .all() session.close() for phrec in phrecs: if phrec.cRectype == "1": settlement_list.append(phrec) elif phrec.cRectype == "3": discharge_record.append(phrec) elif phrec.cRectype == "4": cost_list.append(phrec) settlement_list_ie_result = information_extraction(settlement_ie, settlement_list) settlement_data = { "pk_phhd": pk_phhd, "name": handle_name(get_best_value_in_keys(settlement_list_ie_result, PATIENT_NAME)), "admission_date_str": handle_original_data(get_best_value_in_keys(settlement_list_ie_result, ADMISSION_DATE)), "discharge_date_str": handle_original_data(get_best_value_in_keys(settlement_list_ie_result, DISCHARGE_DATE)), "medical_expenses_str": handle_original_data( get_best_value_in_keys(settlement_list_ie_result, MEDICAL_EXPENSES)), "personal_cash_payment_str": handle_original_data( get_best_value_in_keys(settlement_list_ie_result, PERSONAL_CASH_PAYMENT)), "personal_account_payment_str": handle_original_data( get_best_value_in_keys(settlement_list_ie_result, PERSONAL_ACCOUNT_PAYMENT)), "personal_funded_amount_str": handle_original_data( get_best_value_in_keys(settlement_list_ie_result, PERSONAL_FUNDED_AMOUNT)), "medical_insurance_type": handle_insurance_type( get_best_value_in_keys(settlement_list_ie_result, MEDICAL_INSURANCE_TYPE)) } settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"]) settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"]) settlement_data["discharge_date"] = handle_date(settlement_data["discharge_date_str"]) settlement_data["medical_expenses"] = handle_decimal(settlement_data["medical_expenses_str"]) settlement_data["personal_cash_payment"] = handle_decimal(settlement_data["personal_cash_payment_str"]) settlement_data["personal_account_payment"] = handle_decimal(settlement_data["personal_account_payment_str"]) settlement_data["personal_funded_amount"] = handle_decimal(settlement_data["personal_funded_amount_str"]) save_or_update_ie(ZxIeSettlement, pk_phhd, settlement_data) discharge_record_ie_result = information_extraction(discharge_ie, discharge_record) discharge_data = { "pk_phhd": pk_phhd, "hospital": get_best_value_in_keys(discharge_record_ie_result, HOSPITAL), "department": get_best_value_in_keys(discharge_record_ie_result, DEPARTMENT), "name": handle_name(get_best_value_in_keys(discharge_record_ie_result, PATIENT_NAME)), "admission_date_str": handle_original_data(get_best_value_in_keys(discharge_record_ie_result, ADMISSION_DATE)), "discharge_date_str": handle_original_data(get_best_value_in_keys(discharge_record_ie_result, DISCHARGE_DATE)), "doctor": handle_name(get_best_value_in_keys(discharge_record_ie_result, DOCTOR)) } discharge_data["admission_date"] = handle_date(discharge_data["admission_date_str"]) discharge_data["discharge_date"] = handle_date(discharge_data["discharge_date_str"]) hospital_value = get_values_of_keys(discharge_record_ie_result, HOSPITAL) if hospital_value: session = MysqlSession() yljg = session.query(BdYljg.pk_yljg, BdYljg.name) \ .filter(BdYljg.name.in_(hospital_value)).limit(1).one_or_none() session.close() if yljg: discharge_data["pk_yljg"] = yljg.pk_yljg discharge_data["hospital"] = yljg.name department_value = get_values_of_keys(discharge_record_ie_result, DEPARTMENT) if department_value: department_values = [] for dept in department_value: department_values += handle_department(dept) department_values = list(set(department_values)) if department_values: session = MysqlSession() ylks = session.query(BdYlks.pk_ylks, BdYlks.name) \ .filter(BdYlks.name.in_(department_values)).limit(1).one_or_none() session.close() if ylks: discharge_data["pk_ylks"] = ylks.pk_ylks discharge_data["department"] = ylks.name save_or_update_ie(ZxIeDischarge, pk_phhd, discharge_data) cost_list_ie_result = information_extraction(cost_ie, cost_list) cost_data = { "pk_phhd": pk_phhd, "name": handle_name(get_best_value_in_keys(cost_list_ie_result, PATIENT_NAME)), "admission_date_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, ADMISSION_DATE)), "discharge_date_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, DISCHARGE_DATE)), "medical_expenses_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, MEDICAL_EXPENSES)) } cost_data["admission_date"] = handle_date(cost_data["admission_date_str"]) cost_data["discharge_date"] = handle_date(cost_data["discharge_date_str"]) cost_data["medical_expenses"] = handle_decimal(cost_data["medical_expenses_str"]) save_or_update_ie(ZxIeCost, pk_phhd, cost_data) def main(): # 持续检测新案子 while 1: session = MysqlSession() phhds = session.query(ZxPhhd.pk_phhd) \ .filter(ZxPhhd.exsuccess_flag == '1') \ .limit(PHHD_BATCH_SIZE) \ .all() session.close() if phhds: for phhd in phhds: pk_phhd = phhd.pk_phhd photo_review(pk_phhd) # 识别完成更新标识 session = MysqlSession() stmt = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(exsuccess_flag=8)) session.execute(stmt) session.commit() session.close() paddle.device.cuda.empty_cache() else: # 没有查询到新案子,等待一段时间后再查 sleep_minutes = SLEEP_MINUTES log = logging.getLogger() log.info(f"暂未查询到新案子,等待{sleep_minutes}分钟...") sleep(sleep_minutes * 60)