Files
fcb_photo_review/photo_review/photo_review.py

310 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
import math
import os
import sys
import tempfile
from io import BytesIO
import paddle
import requests
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from PIL import Image
from time import sleep
from sqlalchemy import update
from config.keys import PATIENT_NAME, ADMISSION_DATE, DISCHARGE_DATE, MEDICAL_EXPENSES, PERSONAL_CASH_PAYMENT, \
PERSONAL_ACCOUNT_PAYMENT, PERSONAL_FUNDED_AMOUNT, MEDICAL_INSURANCE_TYPE, HOSPITAL, DEPARTMENT, DOCTOR
from config.mysql import MysqlSession
from config.photo_review import PHHD_BATCH_SIZE, SLEEP_MINUTES
from photo_review import settlement_ie, discharge_ie, cost_ie
from photo_review.entity.bd_yljg import BdYljg
from photo_review.entity.bd_ylks import BdYlks
from photo_review.entity.zx_ie_cost import ZxIeCost
from photo_review.entity.zx_ie_discharge import ZxIeDischarge
from photo_review.entity.zx_ie_settlement import ZxIeSettlement
from photo_review.entity.zx_ocr import ZxOcr
from photo_review.entity.zx_phhd import ZxPhhd
from photo_review.entity.zx_phrec import ZxPhrec
from photo_review.util.data_util import handle_date, handle_decimal, handle_department, handle_name, \
handle_insurance_type, handle_original_data
from photo_review.util.util import get_default_datetime
from ucloud import ucloud
# 获取图片
def open_image_from_url(url):
if url.startswith("http"):
image = Image.open(url)
else:
# 发送HTTP请求获取图片数据
response = requests.get(url)
# 将响应内容转化为BytesIO对象以便PIL处理
image_stream = BytesIO(response.content)
# 使用PIL的Image.open方法打开图像
image = Image.open(image_stream)
return image
# 分割大图片
def split_image(img_path, max_ratio=2.82, best_ration=1.41, overlap=0.05):
split_result = []
# 打开图片
img = open_image_from_url(img_path)
# 获取图片的宽度和高度
width, height = img.size
# 计算宽高比
ratio = max(width, height) / min(width, height)
# 检查是否需要裁剪
if ratio > max_ratio:
# 确定裁剪的尺寸,保持长宽比,以较短边为基准
new_ratio = best_ration - overlap
if width < height: # 高度是较长边
for i in range(math.ceil(height / (width * new_ratio))):
offset = round(width * new_ratio * i)
cropped_img = img.crop((0, offset, width, round(offset + width * best_ration)))
# 统一转为RGB这样可以正确保存为jpg格式
cropped_img = cropped_img.convert("RGB")
split_result.append({"img": cropped_img, "x_offset": 0, "y_offset": offset})
else: # 宽度是较长边
for i in range(math.ceil(width / (height * new_ratio))):
offset = round(height * new_ratio * i)
cropped_img = img.crop((offset, 0, round(offset + height * best_ration), height))
# 统一转为RGB这样可以正确保存为jpg格式
cropped_img = cropped_img.convert("RGB")
split_result.append({"img": cropped_img, "x_offset": offset, "y_offset": 0})
else:
split_result.append({"img": img, "x_offset": 0, "y_offset": 0})
return split_result
# 合并信息抽取结果
def merge_result(result1, result2):
for key in result2:
result1[key] = result1.get(key, []) + result2[key]
return result1
# 关键信息提取
def information_extraction(ie, phrecs):
result = {}
docs = []
doc_phrecs = []
for phrec in phrecs:
pic_path = ucloud.get_private_url(phrec.cfjaddress)
if pic_path:
split_result = split_image(pic_path)
for img in split_result:
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
img["img"].save(temp_file.name)
docs.append({"doc": temp_file.name})
doc_phrecs.append(phrec)
if not docs:
return result
ie_results = []
try:
ie_results = ie(docs)
except Exception as e:
logging.error(e)
return result
finally:
for temp_file in docs:
try:
os.remove(temp_file["doc"])
except Exception as e:
logging.info(f"删除临时文件 {temp_file['doc']} 时出错: {e}")
now = get_default_datetime()
for i in range(len(ie_results)):
ie_result = ie_results[i]
phrec = doc_phrecs[i]
result_json = json.dumps(ie_result, ensure_ascii=False)
if len(result_json) > 5000:
result_json = result_json[:5000]
session = MysqlSession()
zx_ocr = ZxOcr(pk_phhd=phrec.pk_phhd, pk_phrec=phrec.pk_phrec, cfjaddress=phrec.cfjaddress,
content=result_json, create_time=now, update_time=now)
session.add(zx_ocr)
session.commit()
session.close()
result = merge_result(result, ie_result)
return result
# 从keys中获取准确率最高的value
def get_best_value_in_keys(source, keys):
# 最终结果
result = None
# 最大可能性
best_probability = 0
for key in keys:
values = source.get(key)
if values:
for value in values:
text = value.get("text")
probability = value.get("probability")
if text and probability > best_probability:
result = text
best_probability = probability
return result
# 从keys中获取所有value组成list
def get_values_of_keys(source, keys):
result = []
for key in keys:
value = source.get(key)
if value:
for v in value:
v = v.get("text")
if v:
result.append(v)
# 去重
return list(set(result))
def save_or_update_ie(table, pk_phhd, data):
data = {k: v for k, v in data.items() if v is not None and v != ""}
obj = table(**data)
session = MysqlSession()
db_data = session.query(table).filter_by(pk_phhd=pk_phhd).one_or_none()
now = get_default_datetime()
if db_data:
# 更新
db_data.update_time = now
for k, v in data.items():
setattr(db_data, k, v)
else:
# 新增
obj.create_time = now
obj.update_time = now
session.add(obj)
session.commit()
session.close()
def photo_review(pk_phhd):
settlement_list = []
discharge_record = []
cost_list = []
session = MysqlSession()
phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.pk_phhd, ZxPhrec.cRectype, ZxPhrec.cfjaddress) \
.filter(ZxPhrec.pk_phhd == pk_phhd) \
.all()
session.close()
for phrec in phrecs:
if phrec.cRectype == "1":
settlement_list.append(phrec)
elif phrec.cRectype == "3":
discharge_record.append(phrec)
elif phrec.cRectype == "4":
cost_list.append(phrec)
settlement_list_ie_result = information_extraction(settlement_ie, settlement_list)
settlement_data = {
"pk_phhd": pk_phhd,
"name": handle_name(get_best_value_in_keys(settlement_list_ie_result, PATIENT_NAME)),
"admission_date_str": handle_original_data(get_best_value_in_keys(settlement_list_ie_result, ADMISSION_DATE)),
"discharge_date_str": handle_original_data(get_best_value_in_keys(settlement_list_ie_result, DISCHARGE_DATE)),
"medical_expenses_str": handle_original_data(
get_best_value_in_keys(settlement_list_ie_result, MEDICAL_EXPENSES)),
"personal_cash_payment_str": handle_original_data(
get_best_value_in_keys(settlement_list_ie_result, PERSONAL_CASH_PAYMENT)),
"personal_account_payment_str": handle_original_data(
get_best_value_in_keys(settlement_list_ie_result, PERSONAL_ACCOUNT_PAYMENT)),
"personal_funded_amount_str": handle_original_data(
get_best_value_in_keys(settlement_list_ie_result, PERSONAL_FUNDED_AMOUNT)),
"medical_insurance_type": handle_insurance_type(
get_best_value_in_keys(settlement_list_ie_result, MEDICAL_INSURANCE_TYPE))
}
settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"])
settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"])
settlement_data["discharge_date"] = handle_date(settlement_data["discharge_date_str"])
settlement_data["medical_expenses"] = handle_decimal(settlement_data["medical_expenses_str"])
settlement_data["personal_cash_payment"] = handle_decimal(settlement_data["personal_cash_payment_str"])
settlement_data["personal_account_payment"] = handle_decimal(settlement_data["personal_account_payment_str"])
settlement_data["personal_funded_amount"] = handle_decimal(settlement_data["personal_funded_amount_str"])
save_or_update_ie(ZxIeSettlement, pk_phhd, settlement_data)
discharge_record_ie_result = information_extraction(discharge_ie, discharge_record)
discharge_data = {
"pk_phhd": pk_phhd,
"hospital": get_best_value_in_keys(discharge_record_ie_result, HOSPITAL),
"department": get_best_value_in_keys(discharge_record_ie_result, DEPARTMENT),
"name": handle_name(get_best_value_in_keys(discharge_record_ie_result, PATIENT_NAME)),
"admission_date_str": handle_original_data(get_best_value_in_keys(discharge_record_ie_result, ADMISSION_DATE)),
"discharge_date_str": handle_original_data(get_best_value_in_keys(discharge_record_ie_result, DISCHARGE_DATE)),
"doctor": handle_name(get_best_value_in_keys(discharge_record_ie_result, DOCTOR))
}
discharge_data["admission_date"] = handle_date(discharge_data["admission_date_str"])
discharge_data["discharge_date"] = handle_date(discharge_data["discharge_date_str"])
hospital_value = get_values_of_keys(discharge_record_ie_result, HOSPITAL)
if hospital_value:
session = MysqlSession()
yljg = session.query(BdYljg.pk_yljg, BdYljg.name) \
.filter(BdYljg.name.in_(hospital_value)).limit(1).one_or_none()
session.close()
if yljg:
discharge_data["pk_yljg"] = yljg.pk_yljg
discharge_data["hospital"] = yljg.name
department_value = get_values_of_keys(discharge_record_ie_result, DEPARTMENT)
if department_value:
department_values = []
for dept in department_value:
department_values += handle_department(dept)
department_values = list(set(department_values))
if department_values:
session = MysqlSession()
ylks = session.query(BdYlks.pk_ylks, BdYlks.name) \
.filter(BdYlks.name.in_(department_values)).limit(1).one_or_none()
session.close()
if ylks:
discharge_data["pk_ylks"] = ylks.pk_ylks
discharge_data["department"] = ylks.name
save_or_update_ie(ZxIeDischarge, pk_phhd, discharge_data)
cost_list_ie_result = information_extraction(cost_ie, cost_list)
cost_data = {
"pk_phhd": pk_phhd,
"name": handle_name(get_best_value_in_keys(cost_list_ie_result, PATIENT_NAME)),
"admission_date_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, ADMISSION_DATE)),
"discharge_date_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, DISCHARGE_DATE)),
"medical_expenses_str": handle_original_data(get_best_value_in_keys(cost_list_ie_result, MEDICAL_EXPENSES))
}
cost_data["admission_date"] = handle_date(cost_data["admission_date_str"])
cost_data["discharge_date"] = handle_date(cost_data["discharge_date_str"])
cost_data["medical_expenses"] = handle_decimal(cost_data["medical_expenses_str"])
save_or_update_ie(ZxIeCost, pk_phhd, cost_data)
def main():
# 持续检测新案子
while 1:
session = MysqlSession()
# 查询需要识别的案子
phhds = session.query(ZxPhhd.pk_phhd).filter(ZxPhhd.exsuccess_flag == '1').limit(PHHD_BATCH_SIZE).all()
session.close()
if phhds:
for phhd in phhds:
pk_phhd = phhd.pk_phhd
photo_review(pk_phhd)
# 识别完成更新标识
session = MysqlSession()
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(exsuccess_flag=8))
session.execute(update_flag)
session.commit()
session.close()
# 完成一个案子释放显存
paddle.device.cuda.empty_cache()
else:
# 没有查询到新案子,等待一段时间后再查
log = logging.getLogger()
log.info(f"暂未查询到新案子,等待{SLEEP_MINUTES}分钟...")
sleep(SLEEP_MINUTES * 60)