443 lines
19 KiB
Python
443 lines
19 KiB
Python
import json
|
||
import logging
|
||
import os
|
||
import tempfile
|
||
import time
|
||
from collections import defaultdict
|
||
from time import sleep
|
||
|
||
import cv2
|
||
import jieba
|
||
import requests
|
||
from rapidfuzz import process, fuzz
|
||
from sqlalchemy import update
|
||
|
||
from db import MysqlSession
|
||
from db.mysql import BdYljg, BdYlks, ZxIeResult, ZxIeCost, ZxIeDischarge, ZxIeSettlement, ZxPhhd, ZxPhrec
|
||
from doc_dewarp import dewarp
|
||
from paddle_detection import detector
|
||
from photo_review import PATIENT_NAME, ADMISSION_DATE, DISCHARGE_DATE, MEDICAL_EXPENSES, PERSONAL_CASH_PAYMENT, \
|
||
PERSONAL_ACCOUNT_PAYMENT, PERSONAL_FUNDED_AMOUNT, MEDICAL_INSURANCE_TYPE, HOSPITAL, DEPARTMENT, DOCTOR, \
|
||
ADMISSION_ID, SETTLEMENT_ID, AGE, OCR, SETTLEMENT_IE, DISCHARGE_IE, COST_IE, PHHD_BATCH_SIZE, SLEEP_MINUTES, \
|
||
UPPERCASE_MEDICAL_EXPENSES, HOSTNAME, HOSPITAL_ALIAS, HOSPITAL_FILTER, DEPARTMENT_ALIAS, DEPARTMENT_FILTER
|
||
from ucloud import ufile
|
||
from util import image_util, util
|
||
from util.data_util import handle_date, handle_decimal, parse_department, handle_name, \
|
||
handle_insurance_type, handle_original_data, handle_hospital, handle_department, handle_id, handle_age, parse_money, \
|
||
parse_hospital
|
||
|
||
|
||
# 合并信息抽取结果
|
||
def merge_result(result1, result2):
|
||
for key in result2:
|
||
result1[key] = result1.get(key, []) + result2[key]
|
||
return result1
|
||
|
||
|
||
def ie_temp_image(ie, ocr, image):
|
||
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
|
||
cv2.imwrite(temp_file.name, image)
|
||
|
||
ie_result = []
|
||
try:
|
||
layout = util.get_ocr_layout(ocr, temp_file.name)
|
||
if not layout:
|
||
# 无识别结果
|
||
ie_result = []
|
||
else:
|
||
ie_result = ie({"doc": temp_file.name, "layout": layout})[0]
|
||
except Exception as e:
|
||
logging.error("信息抽取时出错", exc_info=e)
|
||
finally:
|
||
try:
|
||
os.remove(temp_file.name)
|
||
except Exception as e:
|
||
logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e)
|
||
return ie_result
|
||
|
||
|
||
# 关键信息提取
|
||
def request_ie_result(task_enum, phrecs):
|
||
url = task_enum.request_url()
|
||
identity = int(time.time())
|
||
images = []
|
||
for phrec in phrecs:
|
||
images.append({"name": phrec.cfjaddress, "pk": phrec.pk_phrec})
|
||
payload = {"images": images, "schema": task_enum.schema(), "pk_phhd": phrecs[0].pk_phhd, "identity": identity}
|
||
response = requests.post(url, json=payload)
|
||
|
||
if response.status_code == 200:
|
||
return response.json()["data"]
|
||
else:
|
||
raise Exception(f"请求信息抽取结果失败,状态码:{response.status_code}")
|
||
|
||
|
||
# 关键信息提取
|
||
def information_extraction(ie, phrecs, identity):
|
||
result = {}
|
||
for phrec in phrecs:
|
||
img_path = ufile.get_private_url(phrec.cfjaddress)
|
||
if not img_path:
|
||
continue
|
||
|
||
image = image_util.read(img_path)
|
||
target_images = []
|
||
det_time = time.time()
|
||
target_images += detector.get_book_areas(image) # 识别文档区域并裁剪
|
||
logging.info(f"det耗时:{time.time() - det_time}")
|
||
if not target_images:
|
||
target_images.append(image) # 识别失败
|
||
angle_count = defaultdict(int, {"0": 0}) # 分割后图片的最优角度统计
|
||
for target_image in target_images:
|
||
dewarp_time = time.time()
|
||
dewarped_image = dewarp.dewarp_image(target_image) # 去扭曲
|
||
logging.info(f"dewarp耗时:{time.time() - dewarp_time}")
|
||
angles = image_util.parse_rotation_angles(dewarped_image)
|
||
zx_ie_results = []
|
||
split_results = image_util.split(dewarped_image)
|
||
for split_result in split_results:
|
||
rotated_img = image_util.rotate(split_result["img"], int(angles[0]))
|
||
ie_results = [{"result": ie_temp_image(ie, OCR, rotated_img), "angle": angles[0]}]
|
||
if not ie_results[0]["result"] or len(ie_results[0]["result"]) < len(ie.kwargs.get("schema")):
|
||
rotated_img = image_util.rotate(split_result["img"], int(angles[1]))
|
||
ie_results.append({"result": ie_temp_image(ie, OCR, rotated_img), "angle": angles[1]})
|
||
|
||
now = util.get_default_datetime()
|
||
best_angle = ["0", 0]
|
||
for ie_result in ie_results:
|
||
if not ie_result["result"]:
|
||
continue
|
||
|
||
result_json = json.dumps(ie_result["result"], ensure_ascii=False)
|
||
if len(result_json) > 5000:
|
||
result_json = result_json[:5000]
|
||
zx_ie_results.append(ZxIeResult(pk_phhd=phrec.pk_phhd, pk_phrec=phrec.pk_phrec, id=identity,
|
||
cfjaddress=phrec.cfjaddress, content=result_json,
|
||
rotation_angle=int(ie_result["angle"]),
|
||
x_offset=split_result["x_offset"],
|
||
y_offset=split_result["y_offset"], create_time=now,
|
||
creator=HOSTNAME, update_time=now, updater=HOSTNAME))
|
||
|
||
result = merge_result(result, ie_result["result"])
|
||
|
||
if len(ie_result["result"]) > best_angle[1]:
|
||
best_angle = [ie_result["angle"], len(ie_result["result"])]
|
||
|
||
angle_count[best_angle[0]] += 1
|
||
|
||
img_angle = max(angle_count, key=angle_count.get)
|
||
if img_angle != "0":
|
||
image = image_util.rotate(image, int(img_angle))
|
||
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
|
||
cv2.imwrite(temp_file.name, image)
|
||
try:
|
||
ufile.upload_file(phrec.cfjaddress, temp_file.name)
|
||
# 修正旋转角度
|
||
for zx_ie_result in zx_ie_results:
|
||
zx_ie_result.rotation_angle -= int(img_angle)
|
||
except Exception as e:
|
||
logging.error(f"上传图片({phrec.cfjaddress})失败", exc_info=e)
|
||
finally:
|
||
util.delete_temp_file(temp_file.name)
|
||
|
||
session = MysqlSession()
|
||
session.add_all(zx_ie_results)
|
||
session.commit()
|
||
session.close()
|
||
|
||
return result
|
||
|
||
|
||
# 从keys中获取准确率最高的value
|
||
def get_best_value_in_keys(source, keys):
|
||
# 最终结果
|
||
result = None
|
||
# 最大可能性
|
||
best_probability = 0
|
||
for key in keys:
|
||
values = source.get(key)
|
||
if values:
|
||
for value in values:
|
||
text = value.get("text")
|
||
probability = value.get("probability")
|
||
if text and probability > best_probability:
|
||
result = text
|
||
best_probability = probability
|
||
return result
|
||
|
||
|
||
# 从keys中获取所有value组成list
|
||
def get_values_of_keys(source, keys):
|
||
result = []
|
||
for key in keys:
|
||
value = source.get(key)
|
||
if value:
|
||
for v in value:
|
||
v = v.get("text")
|
||
if v:
|
||
result.append(v)
|
||
# 去重
|
||
return list(set(result))
|
||
|
||
|
||
def save_or_update_ie(table, pk_phhd, data):
|
||
data = {k: v for k, v in data.items() if v is not None and v != ""}
|
||
obj = table(**data)
|
||
session = MysqlSession()
|
||
db_data = session.query(table).filter_by(pk_phhd=pk_phhd).one_or_none()
|
||
now = util.get_default_datetime()
|
||
if db_data:
|
||
# 更新
|
||
db_data.update_time = now
|
||
db_data.creator = HOSTNAME
|
||
for k, v in data.items():
|
||
setattr(db_data, k, v)
|
||
else:
|
||
# 新增
|
||
obj.create_time = now
|
||
obj.creator = HOSTNAME
|
||
obj.update_time = now
|
||
obj.updater = HOSTNAME
|
||
session.add(obj)
|
||
session.commit()
|
||
session.close()
|
||
|
||
|
||
def search_hospital(hospital):
|
||
def _filter_search_keywords(keywords):
|
||
keywords = [x for x in keywords if x not in HOSPITAL_FILTER and len(x) > 1]
|
||
result1 = ""
|
||
result2 = ""
|
||
for keyword in keywords:
|
||
if "医院" in keyword:
|
||
break
|
||
result2 = result1
|
||
result1 = keyword
|
||
result = [result1]
|
||
if result2:
|
||
result.append(result2)
|
||
return result
|
||
|
||
cut_list = jieba.lcut(hospital, HMM=False)
|
||
session = MysqlSession()
|
||
yljg = session.query(BdYljg.pk_yljg, BdYljg.name).filter(BdYljg.name.like(f"%{'%'.join(cut_list)}%")).all()
|
||
if not yljg:
|
||
filter_keywords = _filter_search_keywords(cut_list)
|
||
for filter_keyword in filter_keywords:
|
||
yljg = session.query(BdYljg.pk_yljg, BdYljg.name).filter(BdYljg.name.like(f"%{filter_keyword}%")).all()
|
||
if yljg:
|
||
break
|
||
session.close()
|
||
yljg = {row.pk_yljg: row.name for row in yljg}
|
||
best_match = process.extractOne(hospital, yljg, scorer=fuzz.partial_token_set_ratio)
|
||
return best_match
|
||
|
||
|
||
def search_department(department):
|
||
def _filter_search_keywords(keywords):
|
||
keywords = [x for x in keywords if x not in DEPARTMENT_FILTER]
|
||
return keywords
|
||
|
||
cut_list = jieba.lcut(department, HMM=False)
|
||
session = MysqlSession()
|
||
cut_list = _filter_search_keywords(cut_list)
|
||
if not cut_list:
|
||
return None
|
||
ylks = session.query(BdYlks.pk_ylks, BdYlks.name).filter(BdYlks.name.like(f"%{'%'.join(cut_list)}%")).all()
|
||
if not ylks:
|
||
filter_keywords = cut_list
|
||
for filter_keyword in filter_keywords:
|
||
ylks = session.query(BdYlks.pk_ylks, BdYlks.name).filter(BdYlks.name.like(f"%{filter_keyword}%")).all()
|
||
if ylks:
|
||
break
|
||
session.close()
|
||
ylks = {row.pk_ylks: row.name for row in ylks}
|
||
best_match = process.extractOne(department, ylks, scorer=fuzz.token_ratio)
|
||
if best_match and best_match[0] in ["内科", "外科"]:
|
||
# 降低内科、外科的优先级
|
||
best_match = list(best_match)
|
||
best_match[1] -= 100
|
||
return best_match
|
||
|
||
|
||
def settlement_task(pk_phhd, settlement_list, identity):
|
||
settlement_list_ie_result = information_extraction(SETTLEMENT_IE, settlement_list, identity)
|
||
settlement_data = {
|
||
"pk_phhd": pk_phhd,
|
||
"name": handle_name(get_best_value_in_keys(settlement_list_ie_result, PATIENT_NAME)),
|
||
"admission_date_str": handle_original_data(get_best_value_in_keys(settlement_list_ie_result, ADMISSION_DATE)),
|
||
"discharge_date_str": handle_original_data(get_best_value_in_keys(settlement_list_ie_result, DISCHARGE_DATE)),
|
||
"personal_cash_payment_str": handle_original_data(
|
||
get_best_value_in_keys(settlement_list_ie_result, PERSONAL_CASH_PAYMENT)),
|
||
"personal_account_payment_str": handle_original_data(
|
||
get_best_value_in_keys(settlement_list_ie_result, PERSONAL_ACCOUNT_PAYMENT)),
|
||
"personal_funded_amount_str": handle_original_data(
|
||
get_best_value_in_keys(settlement_list_ie_result, PERSONAL_FUNDED_AMOUNT)),
|
||
"medical_insurance_type_str": handle_original_data(
|
||
get_best_value_in_keys(settlement_list_ie_result, MEDICAL_INSURANCE_TYPE)),
|
||
"admission_id": handle_id(get_best_value_in_keys(settlement_list_ie_result, ADMISSION_ID)),
|
||
"settlement_id": handle_id(get_best_value_in_keys(settlement_list_ie_result, SETTLEMENT_ID)),
|
||
}
|
||
settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"])
|
||
settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"])
|
||
settlement_data["discharge_date"] = handle_date(settlement_data["discharge_date_str"])
|
||
settlement_data["personal_cash_payment"] = handle_decimal(settlement_data["personal_cash_payment_str"])
|
||
settlement_data["personal_account_payment"] = handle_decimal(settlement_data["personal_account_payment_str"])
|
||
settlement_data["personal_funded_amount"] = handle_decimal(settlement_data["personal_funded_amount_str"])
|
||
settlement_data["medical_insurance_type"] = handle_insurance_type(settlement_data["medical_insurance_type_str"])
|
||
|
||
parse_money_result = parse_money(get_best_value_in_keys(settlement_list_ie_result, UPPERCASE_MEDICAL_EXPENSES),
|
||
get_best_value_in_keys(settlement_list_ie_result, MEDICAL_EXPENSES))
|
||
settlement_data["medical_expenses_str"] = handle_original_data(parse_money_result[0])
|
||
settlement_data["medical_expenses"] = parse_money_result[1]
|
||
save_or_update_ie(ZxIeSettlement, pk_phhd, settlement_data)
|
||
|
||
|
||
def discharge_task(pk_phhd, discharge_record, identity):
|
||
discharge_record_ie_result = information_extraction(DISCHARGE_IE, discharge_record, identity)
|
||
hospitals = get_values_of_keys(discharge_record_ie_result, HOSPITAL)
|
||
departments = get_values_of_keys(discharge_record_ie_result, DEPARTMENT)
|
||
discharge_data = {
|
||
"pk_phhd": pk_phhd,
|
||
"hospital": handle_hospital(",".join(hospitals)),
|
||
"department": handle_department(",".join(departments)),
|
||
"name": handle_name(get_best_value_in_keys(discharge_record_ie_result, PATIENT_NAME)),
|
||
"admission_date_str": handle_original_data(get_best_value_in_keys(discharge_record_ie_result, ADMISSION_DATE)),
|
||
"discharge_date_str": handle_original_data(get_best_value_in_keys(discharge_record_ie_result, DISCHARGE_DATE)),
|
||
"doctor": handle_name(get_best_value_in_keys(discharge_record_ie_result, DOCTOR)),
|
||
"admission_id": handle_id(get_best_value_in_keys(discharge_record_ie_result, ADMISSION_ID)),
|
||
"age": handle_age(get_best_value_in_keys(discharge_record_ie_result, AGE)),
|
||
}
|
||
discharge_data["admission_date"] = handle_date(discharge_data["admission_date_str"])
|
||
discharge_data["discharge_date"] = handle_date(discharge_data["discharge_date_str"])
|
||
|
||
if hospitals:
|
||
match_hospitals = []
|
||
for hospital in hospitals:
|
||
parsed_hospitals = parse_hospital(hospital)
|
||
for parsed_hospital in parsed_hospitals:
|
||
search_result = search_hospital(parsed_hospital)
|
||
match_hospitals.append(search_result)
|
||
if search_result and search_result[1] == 100:
|
||
break
|
||
for hospital_alias_key in HOSPITAL_ALIAS.keys():
|
||
if hospital_alias_key in parsed_hospital:
|
||
for hospital_alias in HOSPITAL_ALIAS[hospital_alias_key]:
|
||
new_hospital = parsed_hospital.replace(hospital_alias_key, hospital_alias)
|
||
match_hospitals.append(search_hospital(new_hospital))
|
||
break
|
||
best_match = None
|
||
best_score = 0
|
||
for match_hospital in match_hospitals:
|
||
if match_hospital and match_hospital[1] > best_score:
|
||
best_match = match_hospital
|
||
best_score = match_hospital[1]
|
||
if best_score == 100:
|
||
break
|
||
|
||
if best_match:
|
||
discharge_data["pk_yljg"] = best_match[2]
|
||
if departments:
|
||
match_departments = []
|
||
for department in departments:
|
||
parsed_departments = parse_department(department)
|
||
for parsed_department in parsed_departments:
|
||
search_result = search_department(parsed_department)
|
||
match_departments.append(search_result)
|
||
if search_result and search_result[1] == 100:
|
||
break
|
||
for department_alias_key in DEPARTMENT_ALIAS.keys():
|
||
if department_alias_key in parsed_department:
|
||
for department_alias in DEPARTMENT_ALIAS[department_alias_key]:
|
||
new_department = parsed_department.replace(department_alias_key, department_alias)
|
||
match_departments.append(search_department(new_department))
|
||
break
|
||
best_match = None
|
||
best_score = -1000
|
||
for match_department in match_departments:
|
||
if match_department and match_department[1] > best_score:
|
||
best_match = match_department
|
||
best_score = match_department[1]
|
||
if best_score == 100:
|
||
break
|
||
if best_match:
|
||
discharge_data["pk_ylks"] = best_match[2]
|
||
save_or_update_ie(ZxIeDischarge, pk_phhd, discharge_data)
|
||
|
||
|
||
def cost_task(pk_phhd, cost_list, identity):
|
||
cost_list_ie_result = information_extraction(COST_IE, cost_list, identity)
|
||
cost_data = {
|
||
"pk_phhd": pk_phhd,
|
||
"name": handle_name(get_best_value_in_keys(cost_list_ie_result, PATIENT_NAME)),
|
||
"admission_date_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, ADMISSION_DATE)),
|
||
"discharge_date_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, DISCHARGE_DATE)),
|
||
"medical_expenses_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, MEDICAL_EXPENSES))
|
||
}
|
||
cost_data["admission_date"] = handle_date(cost_data["admission_date_str"])
|
||
cost_data["discharge_date"] = handle_date(cost_data["discharge_date_str"])
|
||
cost_data["medical_expenses"] = handle_decimal(cost_data["medical_expenses_str"])
|
||
save_or_update_ie(ZxIeCost, pk_phhd, cost_data)
|
||
|
||
|
||
def photo_review(pk_phhd):
|
||
settlement_list = []
|
||
discharge_record = []
|
||
cost_list = []
|
||
|
||
session = MysqlSession()
|
||
phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.pk_phhd, ZxPhrec.cRectype, ZxPhrec.cfjaddress).filter(
|
||
ZxPhrec.pk_phhd == pk_phhd
|
||
).all()
|
||
session.close()
|
||
for phrec in phrecs:
|
||
if phrec.cRectype == "1":
|
||
settlement_list.append(phrec)
|
||
elif phrec.cRectype == "3":
|
||
discharge_record.append(phrec)
|
||
elif phrec.cRectype == "4":
|
||
cost_list.append(phrec)
|
||
|
||
# 同一批图的标识
|
||
identity = int(time.time())
|
||
settlement_task(pk_phhd, settlement_list, identity)
|
||
discharge_task(pk_phhd, discharge_record, identity)
|
||
cost_task(pk_phhd, cost_list, identity)
|
||
|
||
|
||
def main():
|
||
while 1:
|
||
session = MysqlSession()
|
||
phhds = (session.query(ZxPhhd.pk_phhd)
|
||
.join(ZxPhrec, ZxPhhd.pk_phhd == ZxPhrec.pk_phhd, isouter=True)
|
||
.filter(ZxPhhd.exsuccess_flag == "1")
|
||
.filter(ZxPhrec.pk_phrec.isnot(None))
|
||
.distinct().limit(PHHD_BATCH_SIZE).all())
|
||
# 将状态改为正在识别中
|
||
pk_phhd_values = [phhd.pk_phhd for phhd in phhds]
|
||
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(exsuccess_flag="2"))
|
||
session.execute(update_flag)
|
||
session.commit()
|
||
session.close()
|
||
if phhds:
|
||
for phhd in phhds:
|
||
pk_phhd = phhd.pk_phhd
|
||
logging.info(f"开始识别:{pk_phhd}")
|
||
start_time = time.time()
|
||
photo_review(pk_phhd)
|
||
|
||
# 识别完成更新标识
|
||
session = MysqlSession()
|
||
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(
|
||
exsuccess_flag="8",
|
||
ref_id1=HOSTNAME,
|
||
checktime=util.get_default_datetime(),
|
||
fFSYLFY=time.time() - start_time))
|
||
session.execute(update_flag)
|
||
session.commit()
|
||
session.close()
|
||
else:
|
||
# 没有查询到新案子,等待一段时间后再查
|
||
logging.info(f"暂未查询到需要识别的案子,等待{SLEEP_MINUTES}分钟...")
|
||
sleep(SLEEP_MINUTES * 60)
|