新增二维码识别替换高清图片功能

This commit is contained in:
2024-09-05 13:29:17 +08:00
parent 53a3dcd508
commit de631bef2e
6 changed files with 255 additions and 198 deletions

View File

@@ -7,22 +7,23 @@ from collections import defaultdict
from time import sleep
import cv2
import fitz
import jieba
import numpy as np
import requests
import zxingcpp
from rapidfuzz import process, fuzz
from sqlalchemy import update
from db import MysqlSession
from db.mysql import BdYljg, BdYlks, ZxIeResult, ZxIeCost, ZxIeDischarge, ZxIeSettlement, ZxPhhd, ZxPhrec
from doc_dewarp import dewarp
from log import HOSTNAME
from paddle_detection import detector
from photo_review import PATIENT_NAME, ADMISSION_DATE, DISCHARGE_DATE, MEDICAL_EXPENSES, PERSONAL_CASH_PAYMENT, \
PERSONAL_ACCOUNT_PAYMENT, PERSONAL_FUNDED_AMOUNT, MEDICAL_INSURANCE_TYPE, HOSPITAL, DEPARTMENT, DOCTOR, \
ADMISSION_ID, SETTLEMENT_ID, AGE, OCR, SETTLEMENT_IE, DISCHARGE_IE, COST_IE, PHHD_BATCH_SIZE, SLEEP_MINUTES, \
UPPERCASE_MEDICAL_EXPENSES, HOSPITAL_ALIAS, HOSPITAL_FILTER, DEPARTMENT_ALIAS, DEPARTMENT_FILTER
from ucloud import ufile
from util import image_util, util
from util import image_util, util, html_util
from util.data_util import handle_date, handle_decimal, parse_department, handle_name, \
handle_insurance_type, handle_original_data, handle_hospital, handle_department, handle_id, handle_age, parse_money, \
parse_hospital
@@ -73,6 +74,47 @@ def request_ie_result(task_enum, phrecs):
raise Exception(f"请求信息抽取结果失败,状态码:{response.status_code}")
# 尝试从二维码中获取高清图片
def get_better_image_from_qrcode(image, dpi=150):
js_base_url = 'http://einvoice.jsczt.cn'
results = zxingcpp.read_barcodes(image)
for result in results:
pdf = None
pdf_path = None
try:
url = result.text
if url.startswith(js_base_url):
id_base = html_util.get_jsczt_id_base(url)
pdf_url = f'{js_base_url}/download?idBase={id_base}'
pdf_path = html_util.download_pdf(pdf_url)
# 打开PDF文件
pdf = fitz.open(pdf_path)
# 选择第一页
page = pdf[0]
# 定义缩放系数DPI
default_dpi = 72
zoom = dpi / default_dpi
# 设置矩阵变换参数
mat = fitz.Matrix(zoom, zoom)
# 渲染页面
pix = page.get_pixmap(matrix=mat)
# 将渲染结果转换为OpenCV兼容的格式
img = np.frombuffer(pix.samples, dtype=np.uint8).reshape((pix.height, pix.width, -1))
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
return img, page.get_text()
else:
logging.getLogger('qr').info(f'未知二维码内容:{url}')
except Exception as e:
logging.getLogger('error').error('从二维码中获取高清图片时出错', exc_info=e)
continue
finally:
if pdf:
pdf.close()
if pdf_path:
util.delete_temp_file(pdf_path)
return None, None
# 关键信息提取
def information_extraction(ie, phrecs, identity):
result = {}
@@ -83,60 +125,88 @@ def information_extraction(ie, phrecs, identity):
image = image_util.read(img_path)
target_images = []
target_images += detector.request_book_areas(image) # 识别文档区域并裁剪
if not target_images:
target_images.append(image) # 识别失败
angle_count = defaultdict(int, {"0": 0}) # 分割后图片的最优角度统计
for target_image in target_images:
dewarped_image = dewarp.dewarp_image(target_image) # 去扭曲
angles = image_util.parse_rotation_angles(dewarped_image)
zx_ie_results = []
split_results = image_util.split(dewarped_image)
for split_result in split_results:
if split_result["img"] is None or split_result["img"].size == 0:
continue
rotated_img = image_util.rotate(split_result["img"], int(angles[0]))
ie_results = [{"result": ie_temp_image(ie, OCR, rotated_img), "angle": angles[0]}]
if not ie_results[0]["result"] or len(ie_results[0]["result"]) < len(ie.kwargs.get("schema")):
rotated_img = image_util.rotate(split_result["img"], int(angles[1]))
ie_results.append({"result": ie_temp_image(ie, OCR, rotated_img), "angle": angles[1]})
# 尝试从二维码中获取高清图片
better_image, text = get_better_image_from_qrcode(image)
zx_ie_results = []
if better_image is not None:
img_angle = '0'
image = better_image
if text:
info_extract = ie(text)[0]
else:
info_extract = ie_temp_image(ie, OCR, image)
ie_result = {'result': info_extract, 'angle': '0'}
now = util.get_default_datetime()
best_angle = ["0", 0]
for ie_result in ie_results:
if not ie_result["result"]:
now = util.get_default_datetime()
if not ie_result['result']:
continue
result_json = json.dumps(ie_result['result'], ensure_ascii=False)
if len(result_json) > 5000:
result_json = result_json[:5000]
zx_ie_results.append(ZxIeResult(pk_phhd=phrec.pk_phhd, pk_phrec=phrec.pk_phrec, id=identity,
cfjaddress=phrec.cfjaddress, content=result_json,
rotation_angle=int(ie_result['angle']),
x_offset=0, y_offset=0, create_time=now,
creator=HOSTNAME, update_time=now, updater=HOSTNAME))
result = merge_result(result, ie_result['result'])
else:
target_images = []
# target_images += detector.request_book_areas(image) # 识别文档区域并裁剪
if not target_images:
target_images.append(image) # 识别失败
angle_count = defaultdict(int, {'0': 0}) # 分割后图片的最优角度统计
for target_image in target_images:
# dewarped_image = dewarp.dewarp_image(target_image) # 去扭曲
dewarped_image = target_image
angles = image_util.parse_rotation_angles(dewarped_image)
split_results = image_util.split(dewarped_image)
for split_result in split_results:
if split_result['img'] is None or split_result['img'].size == 0:
continue
rotated_img = image_util.rotate(split_result['img'], int(angles[0]))
ie_results = [{'result': ie_temp_image(ie, OCR, rotated_img), 'angle': angles[0]}]
if not ie_results[0]['result'] or len(ie_results[0]['result']) < len(ie.kwargs.get('schema')):
rotated_img = image_util.rotate(split_result['img'], int(angles[1]))
ie_results.append({'result': ie_temp_image(ie, OCR, rotated_img), 'angle': angles[1]})
now = util.get_default_datetime()
best_angle = ['0', 0]
for ie_result in ie_results:
if not ie_result['result']:
continue
result_json = json.dumps(ie_result["result"], ensure_ascii=False)
if len(result_json) > 5000:
result_json = result_json[:5000]
zx_ie_results.append(ZxIeResult(pk_phhd=phrec.pk_phhd, pk_phrec=phrec.pk_phrec, id=identity,
cfjaddress=phrec.cfjaddress, content=result_json,
rotation_angle=int(ie_result["angle"]),
x_offset=split_result["x_offset"],
y_offset=split_result["y_offset"], create_time=now,
creator=HOSTNAME, update_time=now, updater=HOSTNAME))
result_json = json.dumps(ie_result['result'], ensure_ascii=False)
if len(result_json) > 5000:
result_json = result_json[:5000]
zx_ie_results.append(ZxIeResult(pk_phhd=phrec.pk_phhd, pk_phrec=phrec.pk_phrec, id=identity,
cfjaddress=phrec.cfjaddress, content=result_json,
rotation_angle=int(ie_result['angle']),
x_offset=split_result['x_offset'],
y_offset=split_result['y_offset'], create_time=now,
creator=HOSTNAME, update_time=now, updater=HOSTNAME))
result = merge_result(result, ie_result["result"])
result = merge_result(result, ie_result['result'])
if len(ie_result["result"]) > best_angle[1]:
best_angle = [ie_result["angle"], len(ie_result["result"])]
if len(ie_result['result']) > best_angle[1]:
best_angle = [ie_result['angle'], len(ie_result['result'])]
angle_count[best_angle[0]] += 1
angle_count[best_angle[0]] += 1
img_angle = max(angle_count, key=angle_count.get)
img_angle = max(angle_count, key=angle_count.get)
if img_angle != "0":
if img_angle != '0' or better_image is not None:
image = image_util.rotate(image, int(img_angle))
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file:
cv2.imwrite(temp_file.name, image)
try:
ufile.upload_file(phrec.cfjaddress, temp_file.name)
# 修正旋转角度
for zx_ie_result in zx_ie_results:
zx_ie_result.rotation_angle -= int(img_angle)
if img_angle != '0':
# 修正旋转角度
for zx_ie_result in zx_ie_results:
zx_ie_result.rotation_angle -= int(img_angle)
except Exception as e:
logging.error(f"上传图片({phrec.cfjaddress})失败", exc_info=e)
logging.error(f'上传图片({phrec.cfjaddress})失败', exc_info=e)
finally:
util.delete_temp_file(temp_file.name)