From 66c0d5e950808a0b869ab211491b85fbfbc1459d Mon Sep 17 00:00:00 2001 From: liuyebo <1515783401@qq.com> Date: Mon, 1 Jul 2024 12:42:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E4=BD=BF=E7=94=A8=E5=A4=9A?= =?UTF-8?q?=E8=BF=9B=E7=A8=8B=E6=9D=A5=E4=BC=98=E5=8C=96=E6=95=B4=E4=BD=93?= =?UTF-8?q?=E8=AF=86=E5=88=AB=E9=80=9F=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- photo_review/photo_review.py | 70 ++++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 30 deletions(-) diff --git a/photo_review/photo_review.py b/photo_review/photo_review.py index bec0b21..aef5ef9 100644 --- a/photo_review/photo_review.py +++ b/photo_review/photo_review.py @@ -1,3 +1,4 @@ +import concurrent.futures import json import logging import math @@ -290,25 +291,10 @@ def save_or_update_ie(table, pk_phhd, data): session.close() -def photo_review(pk_phhd, task_flows): - settlement_list = [] - discharge_record = [] - cost_list = [] - - session = MysqlSession() - phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.pk_phhd, ZxPhrec.cRectype, ZxPhrec.cfjaddress) \ - .filter(ZxPhrec.pk_phhd == pk_phhd) \ - .all() - session.close() - for phrec in phrecs: - if phrec.cRectype == "1": - settlement_list.append(phrec) - elif phrec.cRectype == "3": - discharge_record.append(phrec) - elif phrec.cRectype == "4": - cost_list.append(phrec) - - settlement_list_ie_result = information_extraction(task_flows[0], settlement_list) +def settlement_task(pk_phhd, settlement_list): + settlement_list_ie_result = information_extraction( + Taskflow("information_extraction", schema=SETTLEMENT_LIST_SCHEMA, model="uie-x-base", + task_path="config/model/settlement_list_model", layout_analysis=LAYOUT_ANALYSIS), settlement_list) settlement_data = { "pk_phhd": pk_phhd, "name": handle_name(get_best_value_in_keys(settlement_list_ie_result, PATIENT_NAME)), @@ -334,7 +320,11 @@ def photo_review(pk_phhd, task_flows): settlement_data["personal_funded_amount"] = handle_decimal(settlement_data["personal_funded_amount_str"]) save_or_update_ie(ZxIeSettlement, pk_phhd, settlement_data) - discharge_record_ie_result = information_extraction(task_flows[1], discharge_record) + +def discharge_task(pk_phhd, discharge_record): + discharge_record_ie_result = information_extraction( + Taskflow("information_extraction", schema=DISCHARGE_RECORD_SCHEMA, model="uie-x-base", + task_path="config/model/discharge_record_model", layout_analysis=LAYOUT_ANALYSIS), discharge_record) discharge_data = { "pk_phhd": pk_phhd, "hospital": handle_hospital(get_best_value_in_keys(discharge_record_ie_result, HOSPITAL)), @@ -371,7 +361,11 @@ def photo_review(pk_phhd, task_flows): discharge_data["department"] = ylks.name save_or_update_ie(ZxIeDischarge, pk_phhd, discharge_data) - cost_list_ie_result = information_extraction(task_flows[2], cost_list) + +def cost_task(pk_phhd, cost_list): + cost_list_ie_result = information_extraction( + Taskflow("information_extraction", schema=COST_LIST_SCHEMA, model="uie-x-base", + task_path="config/model/cost_list_model", layout_analysis=LAYOUT_ANALYSIS), cost_list) cost_data = { "pk_phhd": pk_phhd, "name": handle_name(get_best_value_in_keys(cost_list_ie_result, PATIENT_NAME)), @@ -385,6 +379,30 @@ def photo_review(pk_phhd, task_flows): save_or_update_ie(ZxIeCost, pk_phhd, cost_data) +def photo_review(pk_phhd): + settlement_list = [] + discharge_record = [] + cost_list = [] + + session = MysqlSession() + phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.pk_phhd, ZxPhrec.cRectype, ZxPhrec.cfjaddress) \ + .filter(ZxPhrec.pk_phhd == pk_phhd) \ + .all() + session.close() + for phrec in phrecs: + if phrec.cRectype == "1": + settlement_list.append(phrec) + elif phrec.cRectype == "3": + discharge_record.append(phrec) + elif phrec.cRectype == "4": + cost_list.append(phrec) + + with concurrent.futures.ProcessPoolExecutor() as executor: + executor.submit(settlement_task, pk_phhd, settlement_list) + executor.submit(discharge_task, pk_phhd, discharge_record) + executor.submit(cost_task, pk_phhd, cost_list) + + def main(): # 持续检测新案子 while 1: @@ -393,18 +411,10 @@ def main(): phhds = session.query(ZxPhhd.pk_phhd).filter(ZxPhhd.exsuccess_flag == '1').limit(PHHD_BATCH_SIZE).all() session.close() if phhds: - ie_task_flows = [ - Taskflow("information_extraction", schema=SETTLEMENT_LIST_SCHEMA, model="uie-x-base", - task_path="config/model/settlement_list_model", layout_analysis=LAYOUT_ANALYSIS), - Taskflow("information_extraction", schema=DISCHARGE_RECORD_SCHEMA, model="uie-x-base", - task_path="config/model/discharge_record_model", layout_analysis=LAYOUT_ANALYSIS), - Taskflow("information_extraction", schema=COST_LIST_SCHEMA, model="uie-x-base", - task_path="config/model/cost_list_model", layout_analysis=LAYOUT_ANALYSIS), - ] for phhd in phhds: pk_phhd = phhd.pk_phhd logging.info(f"开始识别:{pk_phhd}") - photo_review(pk_phhd, ie_task_flows) + photo_review(pk_phhd) # 识别完成更新标识 session = MysqlSession()