import datetime import json import os from decimal import Decimal from io import BytesIO from itertools import groupby import requests from PIL import ImageDraw, Image, ImageFont from db import MysqlSession from db.mysql import ZxIeCost, ZxIeDischarge, ZxIeSettlement, ZxPhhd, ZxIeResult, ZxPhrec from ucloud import ufile from util import image_util def check_ie_result(pk_phhd, need_to_annotation=True): os.makedirs(f"./check_result/{pk_phhd}", exist_ok=True) json_result = {"pk_phhd": pk_phhd} session = MysqlSession() phhd = session.query(ZxPhhd.cXm).filter(ZxPhhd.pk_phhd == pk_phhd).one() json_result["cXm"] = phhd.cXm settlement = (session.query(ZxIeSettlement.pk_ie_settlement, ZxIeSettlement.name, ZxIeSettlement.admission_date, ZxIeSettlement.discharge_date, ZxIeSettlement.medical_expenses, ZxIeSettlement.personal_cash_payment, ZxIeSettlement.personal_account_payment, ZxIeSettlement.personal_funded_amount, ZxIeSettlement.medical_insurance_type, ZxIeSettlement.admission_id, ZxIeSettlement.settlement_id) .filter(ZxIeSettlement.pk_phhd == pk_phhd).one()) settlement_result = settlement._asdict() json_result["settlement"] = settlement_result discharge = (session.query(ZxIeDischarge.pk_ie_discharge, ZxIeDischarge.hospital, ZxIeDischarge.pk_yljg, ZxIeDischarge.department, ZxIeDischarge.pk_ylks, ZxIeDischarge.name, ZxIeDischarge.age, ZxIeDischarge.admission_date, ZxIeDischarge.discharge_date, ZxIeDischarge.doctor, ZxIeDischarge.admission_id) .filter(ZxIeDischarge.pk_phhd == pk_phhd).one()) discharge_result = discharge._asdict() json_result["discharge"] = discharge_result cost = session.query(ZxIeCost.pk_ie_cost, ZxIeCost.name, ZxIeCost.admission_date, ZxIeCost.discharge_date, ZxIeCost.medical_expenses).filter(ZxIeCost.pk_phhd == pk_phhd).one() cost_result = cost._asdict() json_result["cost"] = cost_result phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.pk_phhd, ZxPhrec.cRectype, ZxPhrec.cfjaddress).filter( ZxPhrec.pk_phhd == pk_phhd).all() for phrec in phrecs: img_name = phrec.cfjaddress img_path = ufile.get_private_url(img_name, "drg2015") if not img_path: img_path = ufile.get_private_url(img_name) response = requests.get(img_path) image = Image.open(BytesIO(response.content)).convert("RGB") if need_to_annotation: font_size = image.width * image.height / 200000 font = ImageFont.truetype("./font/simfang.ttf", size=font_size) ocr = session.query(ZxIeResult.id, ZxIeResult.content, ZxIeResult.rotation_angle, ZxIeResult.x_offset, ZxIeResult.y_offset).filter(ZxIeResult.pk_phrec == phrec.pk_phrec).all() if not ocr: os.makedirs(f"./check_result/{pk_phhd}/0", exist_ok=True) image.save(f"./check_result/{pk_phhd}/0/{img_name}") for _, group_results in groupby(ocr, key=lambda x: x.id): draw = ImageDraw.Draw(image) for ocr_item in group_results: result = json.loads(ocr_item.content) rotation_angle = ocr_item.rotation_angle x_offset = ocr_item.x_offset y_offset = ocr_item.y_offset for key in result: for value in result[key]: box = value["bbox"][0] if rotation_angle: box = image_util.invert_rotate_rectangle(box, (image.width / 2, image.height / 2), rotation_angle) if x_offset: box[0] += x_offset box[2] += x_offset if y_offset: box[1] += y_offset box[3] += y_offset draw.rectangle(box, outline="red", width=2) # 绘制矩形 draw.text((box[0], box[1] - font_size), key, fill="blue", font=font) # 在矩形上方绘制文本 draw.text((box[0], box[3]), value["text"], fill="blue", font=font) # 在矩形下方绘制文本 os.makedirs(f"./check_result/{pk_phhd}/{ocr_item.id}", exist_ok=True) image.save(f"./check_result/{pk_phhd}/{ocr_item.id}/{img_name}") else: os.makedirs(f"./check_result/{pk_phhd}/0", exist_ok=True) image.save(f"./check_result/{pk_phhd}/0/{img_name}") session.close() # 自定义JSON处理器 def default(obj): if isinstance(obj, Decimal): return float(obj) if isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") with open(f"./check_result/{pk_phhd}/result.json", "w", encoding="utf-8") as json_file: json.dump(json_result, json_file, indent=4, ensure_ascii=False, default=default) if __name__ == '__main__': check_ie_result(5640504)