优化图片分类和图片方向判断

This commit is contained in:
2024-10-16 17:01:56 +08:00
parent 947b4f20f3
commit d374e0743a
2 changed files with 197 additions and 99 deletions

View File

@@ -1,6 +1,7 @@
import json
import logging
import os
import re
import shutil
import time
import uuid
@@ -8,7 +9,6 @@ from collections import defaultdict
from time import sleep
import cv2
import fitz
import jieba
import numpy as np
import zxingcpp
@@ -21,44 +21,48 @@ from log import HOSTNAME
from photo_review import PHHD_BATCH_SIZE, SLEEP_MINUTES, HOSPITAL_ALIAS, HOSPITAL_FILTER, DEPARTMENT_ALIAS, \
DEPARTMENT_FILTER, DISCHARGE_KEY, set_batch_id, get_batch_id
from services.paddle_services import IE_KEY
from ucloud import ufile
from ucloud import ufile, BUCKET
from util import image_util, common_util, html_util, model_util
from util.data_util import handle_date, handle_decimal, parse_department, handle_name, handle_insurance_type, \
handle_original_data, handle_hospital, handle_department, handle_id, handle_age, parse_money, parse_hospital, \
parse_page_num, handle_tiny_int
# 尝试从二维码中获取高清图片
def get_better_image_from_qrcode(img_path, image_id, dpi=150):
def parse_qrcode(img_path, image_id):
"""
解析二维码,尝试从中获取高清图片
:param img_path: 待解析图片
:param image_id: 图片id
:return: 解析结果
"""
def _parse_pdf_url(pdf_url_to_parse):
pdf_file = None
local_pdf_path = None
img_name, img_ext = common_util.parse_save_path(img_path)
try:
local_pdf_path = html_util.download_pdf(pdf_url_to_parse)
# 打开PDF文件
pdf_file = fitz.open(local_pdf_path)
# 选择第一页
page = pdf_file[0]
# 定义缩放系数DPI
default_dpi = 72
zoom = dpi / default_dpi
# 设置矩阵变换参数
mat = fitz.Matrix(zoom, zoom)
# 渲染页面
pix = page.get_pixmap(matrix=mat)
# 将渲染结果转换为OpenCV兼容的格式
img = np.frombuffer(pix.samples, dtype=np.uint8).reshape((pix.height, pix.width, -1))
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
img_name, img_ext = common_util.parse_save_path(img_path)
better_img_path = common_util.get_processed_img_path(f'{img_name}.better.{img_ext}')
cv2.imwrite(better_img_path, img)
return better_img_path, page.get_text()
pdf_imgs = image_util.pdf_to_imgs(local_pdf_path)
# 结算单部分
better_settlement_path = common_util.get_processed_img_path(f'{img_name}.better_settlement.jpg')
cv2.imwrite(better_settlement_path, pdf_imgs[0][0])
# 费用清单部分
better_cost_path = common_util.get_processed_img_path(f'{img_name}.better_cost.jpg')
total_height = sum([p[0].shape[0] for p in pdf_imgs[1:]])
common_width = pdf_imgs[1][0].shape[1]
better_cost_img = np.zeros((total_height, common_width, 3), dtype=np.uint8)
current_y = 0
for pdf in pdf_imgs[1:]:
height = pdf[0].shape[0]
better_cost_img[current_y:current_y + height, :, :] = pdf[0]
current_y += height
# cost_text += pdf[1] # 费用清单文本暂时没用到
cv2.imwrite(better_cost_path, better_cost_img)
return better_settlement_path, pdf_imgs[0][1], better_cost_path
except Exception as ex:
logging.getLogger('error').error('解析pdf失败', exc_info=ex)
return None, None
return None, None, None
finally:
if pdf_file:
pdf_file.close()
if local_pdf_path:
common_util.delete_temp_file(local_pdf_path)
@@ -99,7 +103,7 @@ def get_better_image_from_qrcode(img_path, image_id, dpi=150):
logging.getLogger('error').error('从二维码中获取高清图片时出错', exc_info=e)
continue
return None, None
return None, None, None
# 关键信息提取
@@ -110,29 +114,7 @@ def information_extraction(phrec, pk_phhd):
:param pk_phhd:案子主键
:return:记录类型,信息抽取结果
"""
original_img_path = common_util.get_img_path(phrec.cfjaddress)
if not original_img_path:
img_url = ufile.get_private_url(phrec.cfjaddress)
if not img_url:
return None, None, None
original_img_path = common_util.save_to_local(img_url)
img_path = common_util.get_processed_img_path(phrec.cfjaddress)
shutil.copy2(original_img_path, img_path)
# 尝试从二维码中获取高清图片
better_img_path, text = get_better_image_from_qrcode(img_path, phrec.cfjaddress)
if phrec.cRectype != '1':
better_img_path = None # 非结算单暂时不进行替换
if better_img_path is not None:
rec_type = '基本医保结算单'
if text:
info_extract = model_util.ie_settlement_text(text)
else:
info_extract = model_util.ie_settlement(
better_img_path, common_util.ocr_result_to_layout(model_util.ocr(better_img_path))
)
ocr_text = None # 此处肯定不是出院记录,后续用不到
else:
if image_util.is_photo(img_path):
book_img_path = model_util.det_book(img_path) # 识别文档区域并裁剪
dewarped_img_path = model_util.dewarp(book_img_path) # 去扭曲
@@ -141,6 +123,7 @@ def information_extraction(phrec, pk_phhd):
angles = model_util.clas_orientation(dewarped_img_path)
ocr_text = ''
info_extract = []
rec_type = None
for angle in angles:
ocr_result = []
rotated_img = image_util.rotate(dewarped_img_path, int(angle))
@@ -152,19 +135,27 @@ def information_extraction(phrec, pk_phhd):
ocr_result += model_util.ocr(a4_img)
tmp_ocr_text = common_util.ocr_result_to_text(ocr_result)
rec_type = model_util.clas_text(tmp_ocr_text) if ocr_text else None
if rec_type == '基本医保结算单':
tmp_rec_type = model_util.clas_text(tmp_ocr_text) if ocr_text else None
if not tmp_rec_type:
rec_dict = {
'1': '基本医保结算单',
'3': '出院记录',
'4': '费用清单',
}
tmp_rec_type = rec_dict.get(phrec.cRectype)
if tmp_rec_type == '基本医保结算单':
tmp_info_extract = model_util.ie_settlement(rotated_img, common_util.ocr_result_to_layout(ocr_result))
elif rec_type == '出院记录':
elif tmp_rec_type == '出院记录':
tmp_info_extract = model_util.ie_discharge(rotated_img, common_util.ocr_result_to_layout(ocr_result))
elif rec_type == '费用清单':
elif tmp_rec_type == '费用清单':
tmp_info_extract = model_util.ie_cost(rotated_img, common_util.ocr_result_to_layout(ocr_result))
else:
tmp_info_extract = None
tmp_info_extract = []
if len(tmp_info_extract) > len(info_extract):
info_extract = tmp_info_extract
ocr_text = tmp_ocr_text
rec_type = tmp_rec_type
if info_extract:
result_json = json.dumps(info_extract, ensure_ascii=False)
@@ -425,6 +416,29 @@ def cost_task(pk_phhd, cost_list_ie_result):
return cost_data
def parse_pdf_text(settlement_text):
pattern = (r'(?:交款人:(.*?)\n|住院时间:(.*?)至(.*?)\n|\(小写\)(.*?)\n|个人现金支付:(.*?)\n|个人账户支付:(.*?)\n'
r'|个人自费:(.*?)\n|医保类型:(.*?)\n|住院科别:(.*?)\n|住院号:(.*?)\n|票据号码:(.*?)\n|)')
# 查找所有匹配项
matches = re.findall(pattern, settlement_text)
results = {}
keys = ['患者姓名', '入院日期', '出院日期', '费用总额', '个人现金支付', '个人账户支付', '个人自费', '医保类型',
'科室', '住院号', '医保结算单号码']
for match in matches:
for key, value in zip(keys, match):
if value:
results[key] = [[{'text': value, 'probability': 1}]]
settlement_key = ['患者姓名', '入院日期', '出院日期', '费用总额', '个人现金支付', '个人账户支付', '个人自费',
'医保类型', '住院号', '医保结算单号码']
discharge_key = ['科室', '患者姓名', '入院日期', '出院日期', '住院号']
cost_key = ['患者姓名', '入院日期', '出院日期', '费用总额']
settlement_result = {key: results[key] for key in settlement_key if key in results}
discharge_result = {key: results[key] for key in discharge_key if key in results}
cost_result = {key: results[key] for key in cost_key if key in results}
return settlement_result, discharge_result, cost_result
def photo_review(pk_phhd, name):
"""
处理单个报销案子
@@ -438,7 +452,7 @@ def photo_review(pk_phhd, name):
session = MysqlSession()
phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.cRectype, ZxPhrec.cfjaddress).filter(
ZxPhrec.pk_phhd == pk_phhd
).all()
).order_by(ZxPhrec.cRectype).all()
session.close()
# 同一批图的标识
@@ -446,7 +460,58 @@ def photo_review(pk_phhd, name):
processed_img_dir = common_util.get_processed_img_path('')
os.makedirs(processed_img_dir, exist_ok=True)
has_pdf = False # 是否获取到了pdf获取到可以直接利用pdf更快的获取信息
better_settlement_path = None
better_cost_path = None
settlement_text = ''
for phrec in phrecs:
original_img_path = common_util.get_img_path(phrec.cfjaddress)
if not original_img_path:
img_url = ufile.get_private_url(phrec.cfjaddress)
if not img_url:
continue
original_img_path = common_util.save_to_local(img_url)
img_path = common_util.get_processed_img_path(phrec.cfjaddress)
shutil.copy2(original_img_path, img_path)
# 尝试从二维码中获取高清图片
better_settlement_path, settlement_text, better_cost_path = parse_qrcode(img_path, phrec.cfjaddress)
if better_settlement_path:
has_pdf = True
break
discharge_text = ''
if has_pdf:
settlement_result, discharge_result, cost_result = parse_pdf_text(settlement_text)
discharge_result = defaultdict(list, discharge_result)
is_settlement_updated = False
is_cost_updated = False
for phrec in phrecs:
if phrec.cRectype == '1':
if not is_settlement_updated:
try:
ufile.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress)
ufile.upload_file(phrec.cfjaddress, better_settlement_path)
except Exception as e:
logging.error("更新结算单pdf图片出错", exc_info=e)
finally:
is_settlement_updated = True
elif phrec.cRectype == '3':
rec_type, ie_result, ocr_text = information_extraction(phrec, pk_phhd)
if rec_type == '出院记录':
discharge_text += ocr_text
for key, value in ie_result.items():
discharge_result[key].append(value)
elif phrec.cRectype == '4':
if not is_cost_updated:
try:
ufile.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress)
ufile.upload_file(phrec.cfjaddress, better_cost_path)
except Exception as e:
logging.error("更新费用清单pdf图片出错", exc_info=e)
finally:
is_cost_updated = True
else:
for phrec in phrecs:
rec_type, ie_result, ocr_text = information_extraction(phrec, pk_phhd)
if rec_type == '基本医保结算单':
@@ -560,7 +625,8 @@ def photo_review(pk_phhd, name):
else:
review_result['discharge_date_match'] = '0'
review_result['consistency'] = (review_result['name_match'] == '1' and review_result['admission_date_match'] == '1'
review_result['consistency'] = (
review_result['name_match'] == '1' and review_result['admission_date_match'] == '1'
and review_result['discharge_date_match'] == '1')
review_result['success'] = review_result['integrity'] and review_result['consistency']

View File

@@ -1,6 +1,8 @@
import logging
import math
import cv2
import fitz
import numpy
from PIL import Image
@@ -251,3 +253,33 @@ def is_photo(img_path):
if any(tag in exif for tag in (271, 272)):
return True
return False
def pdf_to_imgs(pdf_path, dpi=150):
pdf_file = None
# 定义缩放系数DPI
default_dpi = 72
zoom = dpi / default_dpi
try:
# 打开PDF文件
pdf_file = fitz.open(pdf_path)
pdf_imgs = []
for page in pdf_file:
# 设置矩阵变换参数
mat = fitz.Matrix(zoom, zoom)
# 渲染页面
pix = page.get_pixmap(matrix=mat)
# 将渲染结果转换为OpenCV兼容的格式
img = numpy.frombuffer(pix.samples, dtype=numpy.uint8).reshape((pix.height, pix.width, -1))
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
pdf_imgs.append([img, page.get_text()])
return pdf_imgs
except Exception as ex:
logging.getLogger('error').error('pdf转图片失败', exc_info=ex)
return None
finally:
if pdf_file:
pdf_file.close()