Files
fcb_photo_review/photo_review/photo_review.py

223 lines
9.3 KiB
Python

import json
import logging
from time import sleep
from sqlalchemy import update
from config.keys import PATIENT_NAME, ADMISSION_DATE, DISCHARGE_DATE, MEDICAL_EXPENSES, PERSONAL_CASH_PAYMENT, \
PERSONAL_ACCOUNT_PAYMENT, PERSONAL_FUNDED_AMOUNT, MEDICAL_INSURANCE_TYPE, HOSPITAL, DEPARTMENT, DOCTOR
from config.mysql import MysqlSession
from config.photo_review import PHHD_BATCH_SIZE, SLEEP_MINUTES
from photo_review import settlement_ie, discharge_ie, cost_ie
from photo_review.entity.bd_yljg import BdYljg
from photo_review.entity.bd_ylks import BdYlks
from photo_review.entity.zx_ie_cost import ZxIeCost
from photo_review.entity.zx_ie_discharge import ZxIeDischarge
from photo_review.entity.zx_ie_settlement import ZxIeSettlement
from photo_review.entity.zx_ocr import ZxOcr
from photo_review.entity.zx_phhd import ZxPhhd
from photo_review.entity.zx_phrec import ZxPhrec
from photo_review.util.data_util import handle_date, handle_decimal, handle_department, handle_name, \
handle_insurance_type
from photo_review.util.ucloud import get_private_url
from photo_review.util.util import get_default_datetime
# 关键信息提取
def information_extraction(ie, phrecs):
result = {}
docs = []
doc_phrecs = []
for phrec in phrecs:
pic_path = get_private_url(phrec.cfjaddress)
if pic_path:
docs.append({"doc": pic_path})
doc_phrecs.append(phrec)
ie_results = ie(docs)
now = get_default_datetime()
for i in range(len(ie_results)):
ie_result = ie_results[i]
phrec = doc_phrecs[i]
result_json = json.dumps(ie_result, ensure_ascii=False)
if len(result_json) > 5000:
result_json = result_json[:5000]
session = MysqlSession()
zx_ocr = ZxOcr(pk_phhd=phrec.pk_phhd, pk_phrec=phrec.pk_phrec, cfjaddress=phrec.cfjaddress,
content=result_json, create_time=now, update_time=now)
session.add(zx_ocr)
session.commit()
session.close()
result.update(ie_result)
return result
# 从keys中获取准确率最高的value
def get_best_value_in_keys(source, keys):
# 最终结果
result = None
# 最大可能性
most_probability = 0
for key in keys:
values = source.get(key)
if values:
for value in values:
text = value.get("text")
probability = value.get("probability")
if text and probability > most_probability:
result = text
return result
# 从keys中获取所有value组成list
def get_values_of_keys(source, keys):
result = []
for key in keys:
value = source.get(key)
if value:
value = value[0].get("text")
if value:
result.append(value)
return result
def save_or_update_ie(table, pk_phhd, data):
data = {k: v for k, v in data.items() if v is not None and v != ""}
obj = table(**data)
session = MysqlSession()
db_data = session.query(table).filter_by(pk_phhd=pk_phhd).one_or_none()
now = get_default_datetime()
if db_data:
# 更新
db_data.update_time = now
for k, v in data.items():
setattr(db_data, k, v)
else:
# 新增
obj.create_time = now
obj.update_time = now
session.add(obj)
session.commit()
session.close()
def photo_review(pk_phhd):
settlement_list = []
discharge_record = []
cost_list = []
session = MysqlSession()
phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.pk_phhd, ZxPhrec.cRectype, ZxPhrec.cfjaddress) \
.filter(ZxPhrec.pk_phhd == pk_phhd) \
.all()
session.close()
for phrec in phrecs:
if phrec.cRectype == "1":
settlement_list.append(phrec)
elif phrec.cRectype == "3":
discharge_record.append(phrec)
elif phrec.cRectype == "4":
cost_list.append(phrec)
settlement_list_ie_result = information_extraction(settlement_ie, settlement_list)
settlement_data = {
"pk_phhd": pk_phhd,
"name": handle_name(get_best_value_in_keys(settlement_list_ie_result, PATIENT_NAME)),
"admission_date_str": get_best_value_in_keys(settlement_list_ie_result, ADMISSION_DATE),
"discharge_date_str": get_best_value_in_keys(settlement_list_ie_result, DISCHARGE_DATE),
"medical_expenses_str": get_best_value_in_keys(settlement_list_ie_result, MEDICAL_EXPENSES),
"personal_cash_payment_str": get_best_value_in_keys(settlement_list_ie_result, PERSONAL_CASH_PAYMENT),
"personal_account_payment_str": get_best_value_in_keys(settlement_list_ie_result, PERSONAL_ACCOUNT_PAYMENT),
"personal_funded_amount_str": get_best_value_in_keys(settlement_list_ie_result, PERSONAL_FUNDED_AMOUNT),
"medical_insurance_type": handle_insurance_type(
get_best_value_in_keys(settlement_list_ie_result, MEDICAL_INSURANCE_TYPE))
}
settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"])
settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"])
settlement_data["discharge_date"] = handle_date(settlement_data["discharge_date_str"])
settlement_data["medical_expenses"] = handle_decimal(settlement_data["medical_expenses_str"])
settlement_data["personal_cash_payment"] = handle_decimal(settlement_data["personal_cash_payment_str"])
settlement_data["personal_account_payment"] = handle_decimal(settlement_data["personal_account_payment_str"])
settlement_data["personal_funded_amount"] = handle_decimal(settlement_data["personal_funded_amount_str"])
save_or_update_ie(ZxIeSettlement, pk_phhd, settlement_data)
discharge_record_ie_result = information_extraction(discharge_ie, discharge_record)
discharge_data = {
"pk_phhd": pk_phhd,
"hospital": get_best_value_in_keys(discharge_record_ie_result, HOSPITAL),
"department": get_best_value_in_keys(discharge_record_ie_result, DEPARTMENT),
"name": handle_name(get_best_value_in_keys(discharge_record_ie_result, PATIENT_NAME)),
"admission_date_str": get_best_value_in_keys(discharge_record_ie_result, ADMISSION_DATE),
"discharge_date_str": get_best_value_in_keys(discharge_record_ie_result, DISCHARGE_DATE),
"doctor": handle_name(get_best_value_in_keys(discharge_record_ie_result, DOCTOR))
}
discharge_data["admission_date"] = handle_date(discharge_data["admission_date_str"])
discharge_data["discharge_date"] = handle_date(discharge_data["discharge_date_str"])
hospital_value = get_values_of_keys(discharge_record_ie_result, HOSPITAL)
if hospital_value:
session = MysqlSession()
yljg = session.query(BdYljg.pk_yljg, BdYljg.name) \
.filter(BdYljg.name.in_(hospital_value)).limit(1).one_or_none()
session.close()
if yljg:
discharge_data["pk_yljg"] = yljg.pk_yljg
discharge_data["hospital"] = yljg.name
department_value = get_values_of_keys(discharge_record_ie_result, DEPARTMENT)
if department_value:
department_values = []
for dept in department_value:
department_values += handle_department(dept)
department_values = list(set(department_values))
if department_values:
session = MysqlSession()
ylks = session.query(BdYlks.pk_ylks, BdYlks.name) \
.filter(BdYlks.name.in_(department_values)).limit(1).one_or_none()
session.close()
if ylks:
discharge_data["pk_ylks"] = ylks.pk_ylks
discharge_data["department"] = ylks.name
save_or_update_ie(ZxIeDischarge, pk_phhd, discharge_data)
cost_list_ie_result = information_extraction(cost_ie, cost_list)
cost_data = {
"pk_phhd": pk_phhd,
"name": handle_name(get_best_value_in_keys(cost_list_ie_result, PATIENT_NAME)),
"admission_date_str": get_best_value_in_keys(cost_list_ie_result, ADMISSION_DATE),
"discharge_date_str": get_best_value_in_keys(cost_list_ie_result, DISCHARGE_DATE),
"medical_expenses_str": get_best_value_in_keys(cost_list_ie_result, MEDICAL_EXPENSES)
}
cost_data["admission_date"] = handle_date(cost_data["admission_date_str"])
cost_data["discharge_date"] = handle_date(cost_data["discharge_date_str"])
cost_data["medical_expenses"] = handle_decimal(cost_data["medical_expenses_str"])
save_or_update_ie(ZxIeCost, pk_phhd, cost_data)
def main():
# 持续检测新案子
while 1:
session = MysqlSession()
phhds = session.query(ZxPhhd.pk_phhd) \
.filter(ZxPhhd.exsuccess_flag == '1') \
.limit(PHHD_BATCH_SIZE) \
.all()
session.close()
if phhds:
for phhd in phhds:
pk_phhd = phhd.pk_phhd
photo_review(pk_phhd)
# 识别完成更新标识
session = MysqlSession()
stmt = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(exsuccess_flag=8))
session.execute(stmt)
session.commit()
session.close()
else:
# 没有查询到新案子,等待一段时间后再查
sleep_minutes = SLEEP_MINUTES
log = logging.getLogger()
log.info(f"暂未查询到新案子,等待{sleep_minutes}分钟...")
sleep(sleep_minutes * 60)