Files
fcb_photo_review/photo_review/photo_review.py
2024-06-21 15:39:42 +08:00

324 lines
13 KiB
Python

import json
import logging
import math
import os
import pdb
import sys
import tempfile
import time
import urllib.request
import cv2
import numpy as np
import paddle
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from time import sleep
from sqlalchemy import update
from config.keys import PATIENT_NAME, ADMISSION_DATE, DISCHARGE_DATE, MEDICAL_EXPENSES, PERSONAL_CASH_PAYMENT, \
PERSONAL_ACCOUNT_PAYMENT, PERSONAL_FUNDED_AMOUNT, MEDICAL_INSURANCE_TYPE, HOSPITAL, DEPARTMENT, DOCTOR
from config.mysql import MysqlSession
from config.photo_review import PHHD_BATCH_SIZE, SLEEP_MINUTES, SETTLEMENT_IE, DISCHARGE_IE, COST_IE
from photo_review.entity.bd_yljg import BdYljg
from photo_review.entity.bd_ylks import BdYlks
from photo_review.entity.zx_ie_cost import ZxIeCost
from photo_review.entity.zx_ie_discharge import ZxIeDischarge
from photo_review.entity.zx_ie_settlement import ZxIeSettlement
from photo_review.entity.zx_ocr import ZxOcr
from photo_review.entity.zx_phhd import ZxPhhd
from photo_review.entity.zx_phrec import ZxPhrec
from photo_review.util.data_util import handle_date, handle_decimal, handle_department, handle_name, \
handle_insurance_type, handle_original_data
from photo_review.util.util import get_default_datetime
from ucloud import ucloud
# 获取图片
def open_image(img_path):
if img_path.startswith("http"):
# 发送HTTP请求并获取图像数据
resp = urllib.request.urlopen(img_path)
# 将数据读取为字节流
image_data = resp.read()
# 将字节流转换为NumPy数组
image_np = np.frombuffer(image_data, np.uint8)
# 解码NumPy数组为OpenCV图像格式
image = cv2.imdecode(image_np, cv2.IMREAD_COLOR)
else:
image = cv2.imread(img_path)
return image
# 分割大图片
def split_image(img_path, max_ratio=2.82, best_ration=1.41, overlap=0.05):
split_result = []
# 打开图片
img = open_image(img_path)
pdb.set_trace()
# 获取图片的宽度和高度
height, width = img.shape[:2]
# 计算宽高比
ratio = max(width, height) / min(width, height)
# 检查是否需要裁剪
if ratio > max_ratio:
# 确定裁剪的尺寸,保持长宽比,以较短边为基准
new_ratio = best_ration - overlap
if width < height:
# 高度是较长边
cropped_width = width * best_ration
for i in range(math.ceil(height / (width * new_ratio))):
offset = round(width * new_ratio * i)
# 参数形式为[y1:y2, x1:x2]
cropped_img = img[offset:round(offset + cropped_width), 0:width]
split_result.append({"img": cropped_img, "x_offset": 0, "y_offset": offset})
# 最后一次裁剪时不足的部分填充黑色
last_img = split_result[-1]["img"]
split_result[-1]["img"] = cv2.copyMakeBorder(last_img, 0, round(cropped_width - last_img.shape[0]), 0, 0, cv2.BORDER_CONSTANT, value=(0, 0, 0))
else:
# 宽度是较长边
cropped_height = height * best_ration
for i in range(math.ceil(width / (height * new_ratio))):
offset = round(height * new_ratio * i)
cropped_img = img[0:height, offset:round(offset + cropped_height)]
split_result.append({"img": cropped_img, "x_offset": offset, "y_offset": 0})
# 最后一次裁剪时不足的部分填充黑色
last_img = split_result[-1]["img"]
split_result[-1]["img"] = cv2.copyMakeBorder(last_img, 0, 0, 0, round(cropped_height - last_img.shape[1]), cv2.BORDER_CONSTANT, value=(0, 0, 0))
else:
split_result.append({"img": img, "x_offset": 0, "y_offset": 0})
return split_result
# 合并信息抽取结果
def merge_result(result1, result2):
for key in result2:
result1[key] = result1.get(key, []) + result2[key]
return result1
# 关键信息提取
def information_extraction(ie, phrecs):
result = {}
docs = []
doc_phrecs = []
for phrec in phrecs:
pic_path = ucloud.get_private_url(phrec.cfjaddress)
if pic_path:
split_result = split_image(pic_path)
for img in split_result:
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
cv2.imwrite(temp_file.name, img["img"])
docs.append({"doc": temp_file.name})
doc_phrecs.append({"phrec": phrec, "x_offset": img["x_offset"], "y_offset": img["y_offset"]})
if not docs:
return result
ie_results = []
try:
ie_results = ie(docs)
except Exception as e:
logging.error(e)
return result
finally:
for temp_file in docs:
try:
os.remove(temp_file["doc"])
except Exception as e:
logging.info(f"删除临时文件 {temp_file['doc']} 时出错: {e}")
now = get_default_datetime()
id = int(time.time())
for i in range(len(ie_results)):
ie_result = ie_results[i]
doc_phrec = doc_phrecs[i]
phrec = doc_phrec["phrec"]
result_json = json.dumps(ie_result, ensure_ascii=False)
if len(result_json) > 5000:
result_json = result_json[:5000]
session = MysqlSession()
zx_ocr = ZxOcr(pk_phhd=phrec.pk_phhd, pk_phrec=phrec.pk_phrec, id=id, cfjaddress=phrec.cfjaddress,
content=result_json, x_offset=doc_phrec["x_offset"], y_offset=doc_phrec["y_offset"],
create_time=now, update_time=now)
session.add(zx_ocr)
session.commit()
session.close()
result = merge_result(result, ie_result)
return result
# 从keys中获取准确率最高的value
def get_best_value_in_keys(source, keys):
# 最终结果
result = None
# 最大可能性
best_probability = 0
for key in keys:
values = source.get(key)
if values:
for value in values:
text = value.get("text")
probability = value.get("probability")
if text and probability > best_probability:
result = text
best_probability = probability
return result
# 从keys中获取所有value组成list
def get_values_of_keys(source, keys):
result = []
for key in keys:
value = source.get(key)
if value:
for v in value:
v = v.get("text")
if v:
result.append(v)
# 去重
return list(set(result))
def save_or_update_ie(table, pk_phhd, data):
data = {k: v for k, v in data.items() if v is not None and v != ""}
obj = table(**data)
session = MysqlSession()
db_data = session.query(table).filter_by(pk_phhd=pk_phhd).one_or_none()
now = get_default_datetime()
if db_data:
# 更新
db_data.update_time = now
for k, v in data.items():
setattr(db_data, k, v)
else:
# 新增
obj.create_time = now
obj.update_time = now
session.add(obj)
session.commit()
session.close()
def photo_review(pk_phhd):
settlement_list = []
discharge_record = []
cost_list = []
session = MysqlSession()
phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.pk_phhd, ZxPhrec.cRectype, ZxPhrec.cfjaddress) \
.filter(ZxPhrec.pk_phhd == pk_phhd) \
.all()
session.close()
for phrec in phrecs:
if phrec.cRectype == "1":
settlement_list.append(phrec)
elif phrec.cRectype == "3":
discharge_record.append(phrec)
elif phrec.cRectype == "4":
cost_list.append(phrec)
settlement_list_ie_result = information_extraction(SETTLEMENT_IE, settlement_list)
settlement_data = {
"pk_phhd": pk_phhd,
"name": handle_name(get_best_value_in_keys(settlement_list_ie_result, PATIENT_NAME)),
"admission_date_str": handle_original_data(get_best_value_in_keys(settlement_list_ie_result, ADMISSION_DATE)),
"discharge_date_str": handle_original_data(get_best_value_in_keys(settlement_list_ie_result, DISCHARGE_DATE)),
"medical_expenses_str": handle_original_data(
get_best_value_in_keys(settlement_list_ie_result, MEDICAL_EXPENSES)),
"personal_cash_payment_str": handle_original_data(
get_best_value_in_keys(settlement_list_ie_result, PERSONAL_CASH_PAYMENT)),
"personal_account_payment_str": handle_original_data(
get_best_value_in_keys(settlement_list_ie_result, PERSONAL_ACCOUNT_PAYMENT)),
"personal_funded_amount_str": handle_original_data(
get_best_value_in_keys(settlement_list_ie_result, PERSONAL_FUNDED_AMOUNT)),
"medical_insurance_type": handle_insurance_type(
get_best_value_in_keys(settlement_list_ie_result, MEDICAL_INSURANCE_TYPE))
}
settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"])
settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"])
settlement_data["discharge_date"] = handle_date(settlement_data["discharge_date_str"])
settlement_data["medical_expenses"] = handle_decimal(settlement_data["medical_expenses_str"])
settlement_data["personal_cash_payment"] = handle_decimal(settlement_data["personal_cash_payment_str"])
settlement_data["personal_account_payment"] = handle_decimal(settlement_data["personal_account_payment_str"])
settlement_data["personal_funded_amount"] = handle_decimal(settlement_data["personal_funded_amount_str"])
save_or_update_ie(ZxIeSettlement, pk_phhd, settlement_data)
discharge_record_ie_result = information_extraction(DISCHARGE_IE, discharge_record)
discharge_data = {
"pk_phhd": pk_phhd,
"hospital": get_best_value_in_keys(discharge_record_ie_result, HOSPITAL),
"department": get_best_value_in_keys(discharge_record_ie_result, DEPARTMENT),
"name": handle_name(get_best_value_in_keys(discharge_record_ie_result, PATIENT_NAME)),
"admission_date_str": handle_original_data(get_best_value_in_keys(discharge_record_ie_result, ADMISSION_DATE)),
"discharge_date_str": handle_original_data(get_best_value_in_keys(discharge_record_ie_result, DISCHARGE_DATE)),
"doctor": handle_name(get_best_value_in_keys(discharge_record_ie_result, DOCTOR))
}
discharge_data["admission_date"] = handle_date(discharge_data["admission_date_str"])
discharge_data["discharge_date"] = handle_date(discharge_data["discharge_date_str"])
hospital_value = get_values_of_keys(discharge_record_ie_result, HOSPITAL)
if hospital_value:
session = MysqlSession()
yljg = session.query(BdYljg.pk_yljg, BdYljg.name) \
.filter(BdYljg.name.in_(hospital_value)).limit(1).one_or_none()
session.close()
if yljg:
discharge_data["pk_yljg"] = yljg.pk_yljg
discharge_data["hospital"] = yljg.name
department_value = get_values_of_keys(discharge_record_ie_result, DEPARTMENT)
if department_value:
department_values = []
for dept in department_value:
department_values += handle_department(dept)
department_values = list(set(department_values))
if department_values:
session = MysqlSession()
ylks = session.query(BdYlks.pk_ylks, BdYlks.name) \
.filter(BdYlks.name.in_(department_values)).limit(1).one_or_none()
session.close()
if ylks:
discharge_data["pk_ylks"] = ylks.pk_ylks
discharge_data["department"] = ylks.name
save_or_update_ie(ZxIeDischarge, pk_phhd, discharge_data)
cost_list_ie_result = information_extraction(COST_IE, cost_list)
cost_data = {
"pk_phhd": pk_phhd,
"name": handle_name(get_best_value_in_keys(cost_list_ie_result, PATIENT_NAME)),
"admission_date_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, ADMISSION_DATE)),
"discharge_date_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, DISCHARGE_DATE)),
"medical_expenses_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, MEDICAL_EXPENSES))
}
cost_data["admission_date"] = handle_date(cost_data["admission_date_str"])
cost_data["discharge_date"] = handle_date(cost_data["discharge_date_str"])
cost_data["medical_expenses"] = handle_decimal(cost_data["medical_expenses_str"])
save_or_update_ie(ZxIeCost, pk_phhd, cost_data)
def main():
# 持续检测新案子
while 1:
session = MysqlSession()
# 查询需要识别的案子
phhds = session.query(ZxPhhd.pk_phhd).filter(ZxPhhd.exsuccess_flag == '1').limit(PHHD_BATCH_SIZE).all()
session.close()
if phhds:
for phhd in phhds:
pk_phhd = phhd.pk_phhd
photo_review(pk_phhd)
# 识别完成更新标识
session = MysqlSession()
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(exsuccess_flag=8))
session.execute(update_flag)
session.commit()
session.close()
# 完成一个案子释放显存
paddle.device.cuda.empty_cache()
else:
# 没有查询到新案子,等待一段时间后再查
log = logging.getLogger()
log.info(f"暂未查询到新案子,等待{SLEEP_MINUTES}分钟...")
sleep(SLEEP_MINUTES * 60)