Files
fcb_photo_review/photo_review/auto_photo_review.py
2024-10-18 10:49:01 +08:00

701 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import copy
import json
import logging
import os
import re
import shutil
import time
import uuid
from collections import defaultdict
from time import sleep
import cv2
import jieba
import numpy as np
import zxingcpp
from rapidfuzz import process, fuzz
from sqlalchemy import update
from db import MysqlSession
from db.mysql import BdYljg, BdYlks, ZxIeCost, ZxIeDischarge, ZxIeSettlement, ZxPhhd, ZxPhrec, ZxIeReview, ZxIeResult
from log import HOSTNAME
from photo_review import PHHD_BATCH_SIZE, SLEEP_MINUTES, HOSPITAL_ALIAS, HOSPITAL_FILTER, DEPARTMENT_ALIAS, \
DEPARTMENT_FILTER, DISCHARGE_KEY, set_batch_id, get_batch_id
from services.paddle_services import IE_KEY
from ucloud import ufile, BUCKET
from util import image_util, common_util, html_util, model_util
from util.data_util import handle_date, handle_decimal, parse_department, handle_name, handle_insurance_type, \
handle_original_data, handle_hospital, handle_department, handle_id, handle_age, parse_money, parse_hospital, \
parse_page_num, handle_tiny_int
def parse_qrcode(img_path, image_id):
"""
解析二维码,尝试从中获取高清图片
:param img_path: 待解析图片
:param image_id: 图片id
:return: 解析结果
"""
def _parse_pdf_url(pdf_url_to_parse):
local_pdf_path = None
img_name, img_ext = common_util.parse_save_path(img_path)
try:
local_pdf_path = html_util.download_pdf(pdf_url_to_parse)
pdf_imgs = image_util.pdf_to_imgs(local_pdf_path)
# 结算单部分
better_settlement_path = common_util.get_processed_img_path(f'{img_name}.better_settlement.jpg')
cv2.imwrite(better_settlement_path, pdf_imgs[0][0])
# 费用清单部分
better_cost_path = common_util.get_processed_img_path(f'{img_name}.better_cost.jpg')
total_height = sum([p[0].shape[0] for p in pdf_imgs[1:]])
common_width = pdf_imgs[1][0].shape[1]
better_cost_img = np.zeros((total_height, common_width, 3), dtype=np.uint8)
current_y = 0
for pdf in pdf_imgs[1:]:
height = pdf[0].shape[0]
better_cost_img[current_y:current_y + height, :, :] = pdf[0]
current_y += height
# cost_text += pdf[1] # 费用清单文本暂时没用到
cv2.imwrite(better_cost_path, better_cost_img)
return better_settlement_path, pdf_imgs[0][1], better_cost_path
except Exception as ex:
logging.getLogger('error').error('解析pdf失败', exc_info=ex)
return None, None, None
finally:
if local_pdf_path:
common_util.delete_temp_file(local_pdf_path)
jsczt_base_url = 'http://einvoice.jsczt.cn'
try:
img = cv2.imread(img_path)
results = zxingcpp.read_barcodes(img, text_mode=zxingcpp.TextMode.HRI)
except Exception as e:
logging.getLogger('error').info('二维码识别失败', exc_info=e)
results = []
for result in results:
try:
url = result.text
if url.startswith(jsczt_base_url):
id_base = html_util.get_jsczt_id_base(url)
if not id_base:
continue
pdf_url = f'{jsczt_base_url}/download?idBase={id_base}'
return _parse_pdf_url(pdf_url)
elif '/yldzpjqr/invoice/query/issueinfo' in url:
# 无锡医院
pdf_url = html_util.get_wx_pdf_url(url)
if not pdf_url:
continue
return _parse_pdf_url(pdf_url)
elif '/dzfp/tz3y' in url:
# 泰州市第三人民医院
pdf_url = html_util.get_tz3y_pdf_url(url)
if not pdf_url:
continue
return _parse_pdf_url(pdf_url)
elif url.startswith('http://weixin.qq.com'):
# 无效地址
continue
else:
logging.getLogger('qr').info(f'[{image_id}]中有未知二维码内容:{url}')
except Exception as e:
logging.getLogger('error').error('从二维码中获取高清图片时出错', exc_info=e)
continue
return None, None, None
# 关键信息提取
def information_extraction(phrec, pk_phhd):
"""
处理单张图片
:param phrec:图片信息
:param pk_phhd:案子主键
:return:记录类型,信息抽取结果
"""
img_path = common_util.get_processed_img_path(phrec.cfjaddress)
if not os.path.exists(img_path):
original_img_path = common_util.get_img_path(phrec.cfjaddress)
if not original_img_path:
img_url = ufile.get_private_url(phrec.cfjaddress)
if not img_url:
return None, None, None
original_img_path = common_util.save_to_local(img_url)
shutil.copy2(original_img_path, img_path)
if image_util.is_photo(img_path):
book_img_path = model_util.det_book(img_path) # 识别文档区域并裁剪
dewarped_img_path = model_util.dewarp(book_img_path) # 去扭曲
else: # todo:也可能是图片,后续添加细分逻辑
dewarped_img_path = img_path
angles = model_util.clas_orientation(dewarped_img_path)
ocr_text = ''
info_extract = []
rec_type = None
for angle in angles:
ocr_result = []
rotated_img = image_util.rotate(dewarped_img_path, int(angle))
split_results = image_util.split(rotated_img)
for split_result in split_results:
if split_result['img'] is None:
continue
a4_img = image_util.expand_to_a4_size(split_result['img'])
tmp_ocr_result = model_util.ocr(a4_img)
if tmp_ocr_result:
ocr_result += tmp_ocr_result
tmp_ocr_text = common_util.ocr_result_to_text(ocr_result)
tmp_rec_type = model_util.clas_text(tmp_ocr_text) if ocr_text else None
if not tmp_rec_type:
rec_dict = {
'1': '基本医保结算单',
'3': '出院记录',
'4': '费用清单',
}
tmp_rec_type = rec_dict.get(phrec.cRectype)
if tmp_rec_type == '基本医保结算单':
tmp_info_extract = model_util.ie_settlement(rotated_img, common_util.ocr_result_to_layout(ocr_result))
elif tmp_rec_type == '出院记录':
tmp_info_extract = model_util.ie_discharge(rotated_img, common_util.ocr_result_to_layout(ocr_result))
elif tmp_rec_type == '费用清单':
tmp_info_extract = model_util.ie_cost(rotated_img, common_util.ocr_result_to_layout(ocr_result))
else:
tmp_info_extract = []
if len(tmp_info_extract) > len(info_extract):
info_extract = tmp_info_extract
ocr_text = tmp_ocr_text
rec_type = tmp_rec_type
if info_extract:
result_json = json.dumps(info_extract, ensure_ascii=False)
if len(result_json) > 5000:
result_json = result_json[:5000]
now = common_util.get_default_datetime()
session = MysqlSession()
session.add(ZxIeResult(pk_phhd=pk_phhd, pk_phrec=phrec.pk_phrec, id=get_batch_id(),
cfjaddress=phrec.cfjaddress, content=result_json, create_time=now,
creator=HOSTNAME, update_time=now, updater=HOSTNAME))
session.commit()
session.close()
return rec_type, info_extract, ocr_text
# 从keys中获取准确率最高的value
def get_best_value_of_key(source, key):
# 最终结果
result = None
# 最大可能性
best_probability = 0
values = source.get(key)
if values:
for value in values:
for v in value:
text = v.get("text")
probability = v.get("probability")
if text and probability > best_probability:
result = text
best_probability = probability
return result
# 从keys中获取所有value组成list
def get_values_of_key(source, key):
result = []
values = source.get(key)
if values:
for value in values:
for v in value:
v = v.get("text")
if v:
result.append(v)
# 去重
return list(set(result))
def save_or_update_ie(table, pk_phhd, data):
data = {k: v for k, v in data.items() if v is not None and v != ""}
obj = table(**data)
session = MysqlSession()
db_data = session.query(table).filter_by(pk_phhd=pk_phhd).one_or_none()
now = common_util.get_default_datetime()
if db_data:
# 更新
db_data.update_time = now
db_data.creator = HOSTNAME
for k, v in data.items():
setattr(db_data, k, v)
else:
# 新增
obj.create_time = now
obj.creator = HOSTNAME
obj.update_time = now
obj.updater = HOSTNAME
session.add(obj)
session.commit()
session.close()
def search_hospital(hospital):
def _filter_search_keywords(keywords):
keywords = [x for x in keywords if x not in HOSPITAL_FILTER and len(x) > 1]
result1 = ""
result2 = ""
for keyword in keywords:
if "医院" in keyword:
break
result2 = result1
result1 = keyword
result = [result1]
if result2:
result.append(result2)
return result
cut_list = jieba.lcut(hospital, HMM=False)
session = MysqlSession()
yljg = session.query(BdYljg.pk_yljg, BdYljg.name).filter(BdYljg.name.like(f"%{'%'.join(cut_list)}%")).all()
if not yljg:
filter_keywords = _filter_search_keywords(cut_list)
for filter_keyword in filter_keywords:
yljg = session.query(BdYljg.pk_yljg, BdYljg.name).filter(BdYljg.name.like(f"%{filter_keyword}%")).all()
if yljg:
break
session.close()
yljg = {row.pk_yljg: row.name for row in yljg}
best_match = process.extractOne(hospital, yljg, scorer=fuzz.partial_token_set_ratio)
return best_match
def search_department(department):
def _filter_search_keywords(keywords):
keywords = [x for x in keywords if x not in DEPARTMENT_FILTER]
return keywords
cut_list = jieba.lcut(department, HMM=False)
session = MysqlSession()
cut_list = _filter_search_keywords(cut_list)
if not cut_list:
return None
ylks = session.query(BdYlks.pk_ylks, BdYlks.name).filter(BdYlks.name.like(f"%{'%'.join(cut_list)}%")).all()
if not ylks:
filter_keywords = cut_list
for filter_keyword in filter_keywords:
ylks = session.query(BdYlks.pk_ylks, BdYlks.name).filter(BdYlks.name.like(f"%{filter_keyword}%")).all()
if ylks:
break
session.close()
ylks = {row.pk_ylks: row.name for row in ylks}
best_match = process.extractOne(department, ylks, scorer=fuzz.token_ratio)
if best_match and best_match[0] in ["内科", "外科"]:
# 降低内科、外科的优先级
best_match = list(best_match)
best_match[1] -= 100
return best_match
def settlement_task(pk_phhd, settlement_list_ie_result):
settlement_data = {
"pk_phhd": pk_phhd,
"name": handle_name(get_best_value_of_key(settlement_list_ie_result, IE_KEY['name'])),
"admission_date_str": handle_original_data(
get_best_value_of_key(settlement_list_ie_result, IE_KEY['admission_date'])),
"discharge_date_str": handle_original_data(
get_best_value_of_key(settlement_list_ie_result, IE_KEY['discharge_date'])),
"personal_cash_payment_str": handle_original_data(
get_best_value_of_key(settlement_list_ie_result, IE_KEY['personal_cash_payment'])),
"personal_account_payment_str": handle_original_data(
get_best_value_of_key(settlement_list_ie_result, IE_KEY['personal_account_payment'])),
"personal_funded_amount_str": handle_original_data(
get_best_value_of_key(settlement_list_ie_result, IE_KEY['personal_funded_amount'])),
"medical_insurance_type_str": handle_original_data(
get_best_value_of_key(settlement_list_ie_result, IE_KEY['medical_insurance_type'])),
"admission_id": handle_id(get_best_value_of_key(settlement_list_ie_result, IE_KEY['admission_id'])),
"settlement_id": handle_id(get_best_value_of_key(settlement_list_ie_result, IE_KEY['settlement_id'])),
}
settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"])
settlement_data["admission_date"] = handle_date(settlement_data["admission_date_str"])
settlement_data["discharge_date"] = handle_date(settlement_data["discharge_date_str"])
settlement_data["personal_cash_payment"] = handle_decimal(settlement_data["personal_cash_payment_str"])
settlement_data["personal_account_payment"] = handle_decimal(settlement_data["personal_account_payment_str"])
settlement_data["personal_funded_amount"] = handle_decimal(settlement_data["personal_funded_amount_str"])
settlement_data["medical_insurance_type"] = handle_insurance_type(settlement_data["medical_insurance_type_str"])
parse_money_result = parse_money(
get_best_value_of_key(settlement_list_ie_result, IE_KEY['uppercase_medical_expenses']),
get_best_value_of_key(settlement_list_ie_result, IE_KEY['medical_expenses']))
settlement_data["medical_expenses_str"] = handle_original_data(parse_money_result[0])
settlement_data["medical_expenses"] = parse_money_result[1]
save_or_update_ie(ZxIeSettlement, pk_phhd, settlement_data)
return settlement_data
def discharge_task(pk_phhd, discharge_record_ie_result):
hospitals = get_values_of_key(discharge_record_ie_result, IE_KEY['hospital'])
departments = get_values_of_key(discharge_record_ie_result, IE_KEY['department'])
discharge_data = {
"pk_phhd": pk_phhd,
"hospital": handle_hospital(",".join(hospitals)),
"department": handle_department(",".join(departments)),
"name": handle_name(get_best_value_of_key(discharge_record_ie_result, IE_KEY['name'])),
"admission_date_str": handle_original_data(
get_best_value_of_key(discharge_record_ie_result, IE_KEY['admission_date'])),
"discharge_date_str": handle_original_data(
get_best_value_of_key(discharge_record_ie_result, IE_KEY['discharge_date'])),
"doctor": handle_name(get_best_value_of_key(discharge_record_ie_result, IE_KEY['doctor'])),
"admission_id": handle_id(get_best_value_of_key(discharge_record_ie_result, IE_KEY['admission_id'])),
"age": handle_age(get_best_value_of_key(discharge_record_ie_result, IE_KEY['age'])),
}
discharge_data["admission_date"] = handle_date(discharge_data["admission_date_str"])
discharge_data["discharge_date"] = handle_date(discharge_data["discharge_date_str"])
if hospitals:
match_hospitals = []
for hospital in hospitals:
parsed_hospitals = parse_hospital(hospital)
for parsed_hospital in parsed_hospitals:
search_result = search_hospital(parsed_hospital)
match_hospitals.append(search_result)
if search_result and search_result[1] == 100:
break
for hospital_alias_key in HOSPITAL_ALIAS.keys():
if hospital_alias_key in parsed_hospital:
for hospital_alias in HOSPITAL_ALIAS[hospital_alias_key]:
new_hospital = parsed_hospital.replace(hospital_alias_key, hospital_alias)
match_hospitals.append(search_hospital(new_hospital))
break
best_match = None
best_score = 0
for match_hospital in match_hospitals:
if match_hospital and match_hospital[1] > best_score:
best_match = match_hospital
best_score = match_hospital[1]
if best_score == 100:
break
if best_match:
discharge_data["pk_yljg"] = best_match[2]
if departments:
match_departments = []
for department in departments:
parsed_departments = parse_department(department)
for parsed_department in parsed_departments:
search_result = search_department(parsed_department)
match_departments.append(search_result)
if search_result and search_result[1] == 100:
break
for department_alias_key in DEPARTMENT_ALIAS.keys():
if department_alias_key in parsed_department:
for department_alias in DEPARTMENT_ALIAS[department_alias_key]:
new_department = parsed_department.replace(department_alias_key, department_alias)
match_departments.append(search_department(new_department))
break
best_match = None
best_score = -1000
for match_department in match_departments:
if match_department and match_department[1] > best_score:
best_match = match_department
best_score = match_department[1]
if best_score == 100:
break
if best_match:
discharge_data["pk_ylks"] = best_match[2]
save_or_update_ie(ZxIeDischarge, pk_phhd, discharge_data)
return discharge_data
def cost_task(pk_phhd, cost_list_ie_result):
cost_data = {
"pk_phhd": pk_phhd,
"name": handle_name(get_best_value_of_key(cost_list_ie_result, IE_KEY['name'])),
"admission_date_str": handle_original_data(
get_best_value_of_key(cost_list_ie_result, IE_KEY['admission_date'])),
"discharge_date_str": handle_original_data(
get_best_value_of_key(cost_list_ie_result, IE_KEY['discharge_date'])),
"medical_expenses_str": handle_original_data(
get_best_value_of_key(cost_list_ie_result, IE_KEY['medical_expenses']))
}
cost_data["admission_date"] = handle_date(cost_data["admission_date_str"])
cost_data["discharge_date"] = handle_date(cost_data["discharge_date_str"])
cost_data["medical_expenses"] = handle_decimal(cost_data["medical_expenses_str"])
if cost_list_ie_result.get(IE_KEY['page']):
page_nums, page_count = parse_page_num(cost_list_ie_result[IE_KEY['page']])
if page_nums:
page_nums_str = [str(num) for num in page_nums]
cost_data['page_nums'] = handle_original_data(','.join(page_nums_str))
cost_data['page_count'] = handle_tiny_int(page_count)
save_or_update_ie(ZxIeCost, pk_phhd, cost_data)
return cost_data
def parse_pdf_text(settlement_text):
pattern = (r'(?:交款人:(.*?)\n|住院时间:(.*?)至(.*?)\n|\(小写\)(.*?)\n|个人现金支付:(.*?)\n|个人账户支付:(.*?)\n'
r'|个人自费:(.*?)\n|医保类型:(.*?)\n|住院科别:(.*?)\n|住院号:(.*?)\n|票据号码:(.*?)\n|)')
# 查找所有匹配项
matches = re.findall(pattern, settlement_text)
results = {}
keys = ['患者姓名', '入院日期', '出院日期', '费用总额', '个人现金支付', '个人账户支付', '个人自费', '医保类型',
'科室', '住院号', '医保结算单号码']
for match in matches:
for key, value in zip(keys, match):
if value:
results[key] = [[{'text': value, 'probability': 1}]]
settlement_key = ['患者姓名', '入院日期', '出院日期', '费用总额', '个人现金支付', '个人账户支付', '个人自费',
'医保类型', '住院号', '医保结算单号码']
discharge_key = ['科室', '患者姓名', '入院日期', '出院日期', '住院号']
cost_key = ['患者姓名', '入院日期', '出院日期', '费用总额']
settlement_result = {key: copy.copy(results[key]) for key in settlement_key if key in results}
discharge_result = {key: copy.copy(results[key]) for key in discharge_key if key in results}
cost_result = {key: copy.copy(results[key]) for key in cost_key if key in results}
return settlement_result, discharge_result, cost_result
def photo_review(pk_phhd, name):
"""
处理单个报销案子
:param pk_phhd: 报销单主键
:param name: 报销人姓名
"""
settlement_result = defaultdict(list)
discharge_result = defaultdict(list)
cost_result = defaultdict(list)
session = MysqlSession()
phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.cRectype, ZxPhrec.cfjaddress).filter(
ZxPhrec.pk_phhd == pk_phhd
).order_by(ZxPhrec.cRectype, ZxPhrec.rowno).all()
session.close()
# 同一批图的标识
set_batch_id(uuid.uuid4().hex)
processed_img_dir = common_util.get_processed_img_path('')
os.makedirs(processed_img_dir, exist_ok=True)
has_pdf = False # 是否获取到了pdf获取到可以直接利用pdf更快的获取信息
better_settlement_path = None
better_cost_path = None
settlement_text = ''
qrcode_img_id = None
for phrec in phrecs:
original_img_path = common_util.get_img_path(phrec.cfjaddress)
if not original_img_path:
img_url = ufile.get_private_url(phrec.cfjaddress)
if not img_url:
continue
original_img_path = common_util.save_to_local(img_url)
img_path = common_util.get_processed_img_path(phrec.cfjaddress)
shutil.copy2(original_img_path, img_path)
# 尝试从二维码中获取高清图片
better_settlement_path, settlement_text, better_cost_path = parse_qrcode(img_path, phrec.cfjaddress)
if better_settlement_path:
has_pdf = True
qrcode_img_id = phrec.cfjaddress
break
discharge_text = ''
if has_pdf:
settlement_result, discharge_result, cost_result = parse_pdf_text(settlement_text)
discharge_ie_result = defaultdict(list)
is_cost_updated = False
for phrec in phrecs:
if phrec.cRectype == '1':
if phrec.cfjaddress == qrcode_img_id:
try:
ufile.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress)
ufile.upload_file(phrec.cfjaddress, better_settlement_path)
except Exception as e:
logging.error("更新结算单pdf图片出错", exc_info=e)
elif phrec.cRectype == '3':
rec_type, ie_result, ocr_text = information_extraction(phrec, pk_phhd)
if rec_type == '出院记录':
discharge_text += ocr_text
for key, value in ie_result.items():
discharge_ie_result[key].append(value)
# 暂不替换费用清单
# elif phrec.cRectype == '4':
# if not is_cost_updated:
# try:
# ufile.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress)
# ufile.upload_file(phrec.cfjaddress, better_cost_path)
# except Exception as e:
# logging.error("更新费用清单pdf图片出错", exc_info=e)
# finally:
# is_cost_updated = True
# 合并出院记录
for key, value in discharge_ie_result.items():
ie_value = get_best_value_of_key(discharge_ie_result, key)
pdf_value = discharge_result.get(key)[0][0]['text'] if discharge_result.get(key) else ''
similarity_ratio = fuzz.ratio(ie_value, pdf_value)
if similarity_ratio < 60:
discharge_result[key] = [[{'text': ie_value, 'probability': 1}]]
else:
for phrec in phrecs:
rec_type, ie_result, ocr_text = information_extraction(phrec, pk_phhd)
if rec_type == '基本医保结算单':
rec_result = settlement_result
elif rec_type == '出院记录':
rec_result = discharge_result
discharge_text += ocr_text
elif rec_type == '费用清单':
rec_result = cost_result
else:
rec_result = None
if rec_result is not None:
for key, value in ie_result.items():
rec_result[key].append(value)
# 删除多余图片
if os.path.exists(processed_img_dir) and os.path.isdir(processed_img_dir):
shutil.rmtree(processed_img_dir)
settlement_data = settlement_task(pk_phhd, settlement_result)
discharge_data = discharge_task(pk_phhd, discharge_result)
cost_data = cost_task(pk_phhd, cost_result)
# 三项资料完整性判断
# 三项资料缺项判断
review_result = {
'pk_phhd': pk_phhd,
'has_settlement': bool(settlement_result),
'has_discharge': bool(discharge_result),
'has_cost': bool(cost_result),
}
if (review_result['has_settlement'] and settlement_data.get('personal_account_payment')
and settlement_data.get('personal_cash_payment') and settlement_data.get('medical_expenses')):
review_result['has_settlement'] &= (
float(settlement_data['personal_account_payment']) + float(settlement_data['personal_cash_payment'])
< float(settlement_data['medical_expenses'])
)
if has_pdf:
review_result['has_discharge'] &= bool(discharge_text)
# 三项资料缺页判断
page_description = []
if review_result['has_discharge']:
for discharge_item in DISCHARGE_KEY:
if not any(key in discharge_text for key in DISCHARGE_KEY[discharge_item]):
page_description.append(f"《出院记录》缺页")
break
if review_result['has_cost']:
cost_missing_page = {}
if cost_data.get('page_nums'):
page_nums = cost_data['page_nums'].split(',')
required_set = set(range(1, cost_data['page_count'] + 1))
page_set = set([int(num) for num in page_nums])
cost_missing_page = required_set - page_set
if cost_missing_page:
cost_missing_page = sorted(cost_missing_page)
cost_missing_page = [str(num) for num in cost_missing_page]
page_description.append(f"《住院费用清单》,缺第{','.join(cost_missing_page)}")
if page_description:
review_result['full_page'] = False
review_result['page_description'] = ';'.join(page_description)
else:
review_result['full_page'] = True
review_result['integrity'] = (review_result['has_settlement'] and review_result['has_discharge']
and review_result['has_cost'] and review_result['full_page'])
# 三项资料一致性判断
# 姓名一致性
name_list = [settlement_data['name'], discharge_data['name'], cost_data['name']]
if sum(not bool(n) for n in name_list) > 1: # 有2个及以上空值直接认为都不一致
review_result['name_match'] = '0'
else:
unique_name = set(name_list)
if len(unique_name) == 1:
review_result['name_match'] = '1' if name == unique_name.pop() else '5'
elif len(unique_name) == 2:
if settlement_data['name'] != discharge_data['name'] and settlement_data['name'] != cost_data['name']:
review_result['name_match'] = '2'
elif discharge_data['name'] != settlement_data['name'] and discharge_data['name'] != cost_data['name']:
review_result['name_match'] = '3'
else:
review_result['name_match'] = '4'
else:
review_result['name_match'] = '0'
# 住院日期一致性
if (settlement_data['admission_date'] and discharge_data['admission_date']
and settlement_data['discharge_date'] and discharge_data['discharge_date']
and settlement_data['admission_date'] == discharge_data['admission_date']
and settlement_data['discharge_date'] == discharge_data['discharge_date']):
review_result['admission_date_match'] = '1'
else:
review_result['admission_date_match'] = '0'
# 出院日期一致性
discharge_date_list = [settlement_data['discharge_date'], discharge_data['discharge_date'],
cost_data['discharge_date']]
if sum(not bool(d) for d in discharge_date_list) > 1:
review_result['discharge_date_match'] = '0'
else:
unique_discharge_date = set(discharge_date_list)
if len(unique_discharge_date) == 1:
review_result['discharge_date_match'] = '1'
elif len(unique_discharge_date) == 2:
if (settlement_data['discharge_date'] != discharge_data['discharge_date']
and settlement_data['discharge_date'] != cost_data['discharge_date']):
review_result['discharge_date_match'] = '2'
elif (discharge_data['discharge_date'] != settlement_data['discharge_date']
and discharge_data['discharge_date'] != cost_data['discharge_date']):
review_result['discharge_date_match'] = '3'
else:
review_result['discharge_date_match'] = '4'
else:
review_result['discharge_date_match'] = '0'
review_result['consistency'] = (
review_result['name_match'] == '1' and review_result['admission_date_match'] == '1'
and review_result['discharge_date_match'] == '1')
review_result['success'] = review_result['integrity'] and review_result['consistency']
save_or_update_ie(ZxIeReview, pk_phhd, review_result)
def main():
"""
照片审核批量控制
"""
while 1:
session = MysqlSession()
phhds = (session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm)
.join(ZxPhrec, ZxPhhd.pk_phhd == ZxPhrec.pk_phhd, isouter=True)
.filter(ZxPhhd.exsuccess_flag == "1")
.filter(ZxPhrec.pk_phrec.isnot(None))
.order_by(ZxPhhd.priority_num.desc())
.distinct().limit(PHHD_BATCH_SIZE).all())
# 将状态改为正在识别中
pk_phhd_values = [phhd.pk_phhd for phhd in phhds]
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(exsuccess_flag="2"))
session.execute(update_flag)
session.commit()
session.close()
if phhds:
for phhd in phhds:
pk_phhd = phhd.pk_phhd
logging.info(f"开始识别:{pk_phhd}")
start_time = time.time()
photo_review(pk_phhd, phhd.cXm)
# 识别完成更新标识
session = MysqlSession()
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(
exsuccess_flag="8",
ref_id1=HOSTNAME,
checktime=common_util.get_default_datetime(),
fFSYLFY=time.time() - start_time))
session.execute(update_flag)
session.commit()
session.close()
else:
# 没有查询到新案子,等待一段时间后再查
logging.info(f"暂未查询到需要识别的案子,等待{SLEEP_MINUTES}分钟...")
sleep(SLEEP_MINUTES * 60)