Files
fcb_photo_review/photo_review/auto_photo_review.py
2024-08-28 17:14:43 +08:00

443 lines
19 KiB
Python

import json
import logging
import os
import tempfile
import time
from collections import defaultdict
from time import sleep
import cv2
import jieba
import requests
from rapidfuzz import process, fuzz
from sqlalchemy import update
from db import MysqlSession
from db.mysql import BdYljg, BdYlks, ZxIeResult, ZxIeCost, ZxIeDischarge, ZxIeSettlement, ZxPhhd, ZxPhrec
from doc_dewarp import dewarp
from paddle_detection import detector
from photo_review import PATIENT_NAME, ADMISSION_DATE, DISCHARGE_DATE, MEDICAL_EXPENSES, PERSONAL_CASH_PAYMENT, \
PERSONAL_ACCOUNT_PAYMENT, PERSONAL_FUNDED_AMOUNT, MEDICAL_INSURANCE_TYPE, HOSPITAL, DEPARTMENT, DOCTOR, \
ADMISSION_ID, SETTLEMENT_ID, AGE, OCR, SETTLEMENT_IE, DISCHARGE_IE, COST_IE, PHHD_BATCH_SIZE, SLEEP_MINUTES, \
UPPERCASE_MEDICAL_EXPENSES, HOSTNAME, HOSPITAL_ALIAS, HOSPITAL_FILTER, DEPARTMENT_ALIAS, DEPARTMENT_FILTER
from ucloud import ufile
from util import image_util, util
from util.data_util import handle_date, handle_decimal, parse_department, handle_name, \
handle_insurance_type, handle_original_data, handle_hospital, handle_department, handle_id, handle_age, parse_money, \
parse_hospital
# 合并信息抽取结果
def merge_result(result1, result2):
for key in result2:
result1[key] = result1.get(key, []) + result2[key]
return result1
def ie_temp_image(ie, ocr, image):
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
cv2.imwrite(temp_file.name, image)
ie_result = []
try:
layout = util.get_ocr_layout(ocr, temp_file.name)
if not layout:
# 无识别结果
ie_result = []
else:
ie_result = ie({"doc": temp_file.name, "layout": layout})[0]
except Exception as e:
logging.error("信息抽取时出错", exc_info=e)
finally:
try:
os.remove(temp_file.name)
except Exception as e:
logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e)
return ie_result
# 关键信息提取
def request_ie_result(task_enum, phrecs):
url = task_enum.request_url()
identity = int(time.time())
images = []
for phrec in phrecs:
images.append({"name": phrec.cfjaddress, "pk": phrec.pk_phrec})
payload = {"images": images, "schema": task_enum.schema(), "pk_phhd": phrecs[0].pk_phhd, "identity": identity}
response = requests.post(url, json=payload)
if response.status_code == 200:
return response.json()["data"]
else:
raise Exception(f"请求信息抽取结果失败,状态码:{response.status_code}")
# 关键信息提取
def information_extraction(ie, phrecs, identity):
result = {}
for phrec in phrecs:
img_path = ufile.get_private_url(phrec.cfjaddress)
if not img_path:
continue
image = image_util.read(img_path)
target_images = []
det_time = time.time()
target_images += detector.request_book_areas(image) # 识别文档区域并裁剪
logging.info(f"检测目标耗时{time.time() - det_time}")
if not target_images:
target_images.append(image) # 识别失败
angle_count = defaultdict(int, {"0": 0}) # 分割后图片的最优角度统计
for target_image in target_images:
dewarped_image = dewarp.dewarp_image(target_image) # 去扭曲
angles = image_util.parse_rotation_angles(dewarped_image)
zx_ie_results = []
split_results = image_util.split(dewarped_image)
for split_result in split_results:
if split_result["img"] is None or split_result["img"].size == 0:
continue
rotated_img = image_util.rotate(split_result["img"], int(angles[0]))
ie_results = [{"result": ie_temp_image(ie, OCR, rotated_img), "angle": angles[0]}]
if not ie_results[0]["result"] or len(ie_results[0]["result"]) < len(ie.kwargs.get("schema")):
rotated_img = image_util.rotate(split_result["img"], int(angles[1]))
ie_results.append({"result": ie_temp_image(ie, OCR, rotated_img), "angle": angles[1]})
now = util.get_default_datetime()
best_angle = ["0", 0]
for ie_result in ie_results:
if not ie_result["result"]:
continue
result_json = json.dumps(ie_result["result"], ensure_ascii=False)
if len(result_json) > 5000:
result_json = result_json[:5000]
zx_ie_results.append(ZxIeResult(pk_phhd=phrec.pk_phhd, pk_phrec=phrec.pk_phrec, id=identity,
cfjaddress=phrec.cfjaddress, content=result_json,
rotation_angle=int(ie_result["angle"]),
x_offset=split_result["x_offset"],
y_offset=split_result["y_offset"], create_time=now,
creator=HOSTNAME, update_time=now, updater=HOSTNAME))
result = merge_result(result, ie_result["result"])
if len(ie_result["result"]) > best_angle[1]:
best_angle = [ie_result["angle"], len(ie_result["result"])]
angle_count[best_angle[0]] += 1
img_angle = max(angle_count, key=angle_count.get)
if img_angle != "0":
image = image_util.rotate(image, int(img_angle))
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
cv2.imwrite(temp_file.name, image)
try:
ufile.upload_file(phrec.cfjaddress, temp_file.name)
# 修正旋转角度
for zx_ie_result in zx_ie_results:
zx_ie_result.rotation_angle -= int(img_angle)
except Exception as e:
logging.error(f"上传图片({phrec.cfjaddress})失败", exc_info=e)
finally:
util.delete_temp_file(temp_file.name)
session = MysqlSession()
session.add_all(zx_ie_results)
session.commit()
session.close()
return result
# 从keys中获取准确率最高的value
def get_best_value_in_keys(source, keys):
# 最终结果
result = None
# 最大可能性
best_probability = 0
for key in keys:
values = source.get(key)
if values:
for value in values:
text = value.get("text")
probability = value.get("probability")
if text and probability > best_probability:
result = text
best_probability = probability
return result
# 从keys中获取所有value组成list
def get_values_of_keys(source, keys):
result = []
for key in keys:
value = source.get(key)
if value:
for v in value:
v = v.get("text")
if v:
result.append(v)
# 去重
return list(set(result))
def save_or_update_ie(table, pk_phhd, data):
data = {k: v for k, v in data.items() if v is not None and v != ""}
obj = table(**data)
session = MysqlSession()
db_data = session.query(table).filter_by(pk_phhd=pk_phhd).one_or_none()
now = util.get_default_datetime()
if db_data:
# 更新
db_data.update_time = now
db_data.creator = HOSTNAME
for k, v in data.items():
setattr(db_data, k, v)
else:
# 新增
obj.create_time = now
obj.creator = HOSTNAME
obj.update_time = now
obj.updater = HOSTNAME
session.add(obj)
session.commit()
session.close()
def search_hospital(hospital):
def _filter_search_keywords(keywords):
keywords = [x for x in keywords if x not in HOSPITAL_FILTER and len(x) > 1]
result1 = ""
result2 = ""
for keyword in keywords:
if "医院" in keyword:
break
result2 = result1
result1 = keyword
result = [result1]
if result2:
result.append(result2)
return result
cut_list = jieba.lcut(hospital, HMM=False)
session = MysqlSession()
yljg = session.query(BdYljg.pk_yljg, BdYljg.name).filter(BdYljg.name.like(f"%{'%'.join(cut_list)}%")).all()
if not yljg:
filter_keywords = _filter_search_keywords(cut_list)
for filter_keyword in filter_keywords:
yljg = session.query(BdYljg.pk_yljg, BdYljg.name).filter(BdYljg.name.like(f"%{filter_keyword}%")).all()
if yljg:
break
session.close()
yljg = {row.pk_yljg: row.name for row in yljg}
best_match = process.extractOne(hospital, yljg, scorer=fuzz.partial_token_set_ratio)
return best_match
def search_department(department):
def _filter_search_keywords(keywords):
keywords = [x for x in keywords if x not in DEPARTMENT_FILTER]
return keywords
cut_list = jieba.lcut(department, HMM=False)
session = MysqlSession()
cut_list = _filter_search_keywords(cut_list)
if not cut_list:
return None
ylks = session.query(BdYlks.pk_ylks, BdYlks.name).filter(BdYlks.name.like(f"%{'%'.join(cut_list)}%")).all()
if not ylks:
filter_keywords = cut_list
for filter_keyword in filter_keywords:
ylks = session.query(BdYlks.pk_ylks, BdYlks.name).filter(BdYlks.name.like(f"%{filter_keyword}%")).all()
if ylks:
break
session.close()
ylks = {row.pk_ylks: row.name for row in ylks}
best_match = process.extractOne(department, ylks, scorer=fuzz.token_ratio)
if best_match and best_match[0] in ["内科", "外科"]:
# 降低内科、外科的优先级
best_match = list(best_match)
best_match[1] -= 100
return best_match
def settlement_task(pk_phhd, settlement_list, identity):
settlement_list_ie_result = information_extraction(SETTLEMENT_IE, settlement_list, identity)
settlement_data = {
"pk_phhd": pk_phhd,
"name": handle_name(get_best_value_in_keys(settlement_list_ie_result, PATIENT_NAME)),
"admission_date_str": handle_original_data(get_best_value_in_keys(settlement_list_ie_result, ADMISSION_DATE)),
"discharge_date_str": handle_original_data(get_best_value_in_keys(settlement_list_ie_result, DISCHARGE_DATE)),
"personal_cash_payment_str": handle_original_data(
get_best_value_in_keys(settlement_list_ie_result, PERSONAL_CASH_PAYMENT)),
"personal_account_payment_str": handle_original_data(
get_best_value_in_keys(settlement_list_ie_result, PERSONAL_ACCOUNT_PAYMENT)),
"personal_funded_amount_str": handle_original_data(
get_best_value_in_keys(settlement_list_ie_result, PERSONAL_FUNDED_AMOUNT)),
"medical_insurance_type_str": handle_original_data(
get_best_value_in_keys(settlement_list_ie_result, MEDICAL_INSURANCE_TYPE)),
"admission_id": handle_id(get_best_value_in_keys(settlement_list_ie_result, ADMISSION_ID)),
"settlement_id": handle_id(get_best_value_in_keys(settlement_list_ie_result, SETTLEMENT_ID)),
}
settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"])
settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"])
settlement_data["discharge_date"] = handle_date(settlement_data["discharge_date_str"])
settlement_data["personal_cash_payment"] = handle_decimal(settlement_data["personal_cash_payment_str"])
settlement_data["personal_account_payment"] = handle_decimal(settlement_data["personal_account_payment_str"])
settlement_data["personal_funded_amount"] = handle_decimal(settlement_data["personal_funded_amount_str"])
settlement_data["medical_insurance_type"] = handle_insurance_type(settlement_data["medical_insurance_type_str"])
parse_money_result = parse_money(get_best_value_in_keys(settlement_list_ie_result, UPPERCASE_MEDICAL_EXPENSES),
get_best_value_in_keys(settlement_list_ie_result, MEDICAL_EXPENSES))
settlement_data["medical_expenses_str"] = handle_original_data(parse_money_result[0])
settlement_data["medical_expenses"] = parse_money_result[1]
save_or_update_ie(ZxIeSettlement, pk_phhd, settlement_data)
def discharge_task(pk_phhd, discharge_record, identity):
discharge_record_ie_result = information_extraction(DISCHARGE_IE, discharge_record, identity)
hospitals = get_values_of_keys(discharge_record_ie_result, HOSPITAL)
departments = get_values_of_keys(discharge_record_ie_result, DEPARTMENT)
discharge_data = {
"pk_phhd": pk_phhd,
"hospital": handle_hospital(",".join(hospitals)),
"department": handle_department(",".join(departments)),
"name": handle_name(get_best_value_in_keys(discharge_record_ie_result, PATIENT_NAME)),
"admission_date_str": handle_original_data(get_best_value_in_keys(discharge_record_ie_result, ADMISSION_DATE)),
"discharge_date_str": handle_original_data(get_best_value_in_keys(discharge_record_ie_result, DISCHARGE_DATE)),
"doctor": handle_name(get_best_value_in_keys(discharge_record_ie_result, DOCTOR)),
"admission_id": handle_id(get_best_value_in_keys(discharge_record_ie_result, ADMISSION_ID)),
"age": handle_age(get_best_value_in_keys(discharge_record_ie_result, AGE)),
}
discharge_data["admission_date"] = handle_date(discharge_data["admission_date_str"])
discharge_data["discharge_date"] = handle_date(discharge_data["discharge_date_str"])
if hospitals:
match_hospitals = []
for hospital in hospitals:
parsed_hospitals = parse_hospital(hospital)
for parsed_hospital in parsed_hospitals:
search_result = search_hospital(parsed_hospital)
match_hospitals.append(search_result)
if search_result and search_result[1] == 100:
break
for hospital_alias_key in HOSPITAL_ALIAS.keys():
if hospital_alias_key in parsed_hospital:
for hospital_alias in HOSPITAL_ALIAS[hospital_alias_key]:
new_hospital = parsed_hospital.replace(hospital_alias_key, hospital_alias)
match_hospitals.append(search_hospital(new_hospital))
break
best_match = None
best_score = 0
for match_hospital in match_hospitals:
if match_hospital and match_hospital[1] > best_score:
best_match = match_hospital
best_score = match_hospital[1]
if best_score == 100:
break
if best_match:
discharge_data["pk_yljg"] = best_match[2]
if departments:
match_departments = []
for department in departments:
parsed_departments = parse_department(department)
for parsed_department in parsed_departments:
search_result = search_department(parsed_department)
match_departments.append(search_result)
if search_result and search_result[1] == 100:
break
for department_alias_key in DEPARTMENT_ALIAS.keys():
if department_alias_key in parsed_department:
for department_alias in DEPARTMENT_ALIAS[department_alias_key]:
new_department = parsed_department.replace(department_alias_key, department_alias)
match_departments.append(search_department(new_department))
break
best_match = None
best_score = -1000
for match_department in match_departments:
if match_department and match_department[1] > best_score:
best_match = match_department
best_score = match_department[1]
if best_score == 100:
break
if best_match:
discharge_data["pk_ylks"] = best_match[2]
save_or_update_ie(ZxIeDischarge, pk_phhd, discharge_data)
def cost_task(pk_phhd, cost_list, identity):
cost_list_ie_result = information_extraction(COST_IE, cost_list, identity)
cost_data = {
"pk_phhd": pk_phhd,
"name": handle_name(get_best_value_in_keys(cost_list_ie_result, PATIENT_NAME)),
"admission_date_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, ADMISSION_DATE)),
"discharge_date_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, DISCHARGE_DATE)),
"medical_expenses_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, MEDICAL_EXPENSES))
}
cost_data["admission_date"] = handle_date(cost_data["admission_date_str"])
cost_data["discharge_date"] = handle_date(cost_data["discharge_date_str"])
cost_data["medical_expenses"] = handle_decimal(cost_data["medical_expenses_str"])
save_or_update_ie(ZxIeCost, pk_phhd, cost_data)
def photo_review(pk_phhd):
settlement_list = []
discharge_record = []
cost_list = []
session = MysqlSession()
phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.pk_phhd, ZxPhrec.cRectype, ZxPhrec.cfjaddress).filter(
ZxPhrec.pk_phhd == pk_phhd
).all()
session.close()
for phrec in phrecs:
if phrec.cRectype == "1":
settlement_list.append(phrec)
elif phrec.cRectype == "3":
discharge_record.append(phrec)
elif phrec.cRectype == "4":
cost_list.append(phrec)
# 同一批图的标识
identity = int(time.time())
settlement_task(pk_phhd, settlement_list, identity)
discharge_task(pk_phhd, discharge_record, identity)
cost_task(pk_phhd, cost_list, identity)
def main():
while 1:
session = MysqlSession()
phhds = (session.query(ZxPhhd.pk_phhd)
.join(ZxPhrec, ZxPhhd.pk_phhd == ZxPhrec.pk_phhd, isouter=True)
.filter(ZxPhhd.exsuccess_flag == "1")
.filter(ZxPhrec.pk_phrec.isnot(None))
.distinct().limit(PHHD_BATCH_SIZE).all())
# 将状态改为正在识别中
pk_phhd_values = [phhd.pk_phhd for phhd in phhds]
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(exsuccess_flag="2"))
session.execute(update_flag)
session.commit()
session.close()
if phhds:
for phhd in phhds:
pk_phhd = phhd.pk_phhd
logging.info(f"开始识别:{pk_phhd}")
start_time = time.time()
photo_review(pk_phhd)
# 识别完成更新标识
session = MysqlSession()
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(
exsuccess_flag="8",
ref_id1=HOSTNAME,
checktime=util.get_default_datetime(),
fFSYLFY=time.time() - start_time))
session.execute(update_flag)
session.commit()
session.close()
else:
# 没有查询到新案子,等待一段时间后再查
logging.info(f"暂未查询到需要识别的案子,等待{SLEEP_MINUTES}分钟...")
sleep(SLEEP_MINUTES * 60)