更新OCR版本,Bata版,还不能上线

This commit is contained in:
2025-09-15 15:41:30 +08:00
parent d266c2828c
commit 670172e79e
9 changed files with 117 additions and 110 deletions

View File

@@ -62,7 +62,7 @@ def find_boxes(content, layout, offset=0, length=None, improve=False, image_path
captured_image, offset_x, offset_y = image_util.expand_to_a4_size(captured_image)
cv2.imwrite(temp_file.name, captured_image)
try:
layouts = util.get_ocr_layout(OCR, temp_file.name)
layouts, _ = util.get_ocr_layout(OCR, temp_file.name)
except TypeError:
# 如果是类型错误,大概率是没识别到文字
layouts = []
@@ -100,7 +100,7 @@ def get_mask_layout(image, name, id_card_num):
result = []
try:
try:
layouts = util.get_ocr_layout(OCR, temp_file.name)
layouts, _ = util.get_ocr_layout(OCR, temp_file.name)
# layouts = OCR.parse({"doc": temp_file.name})["layout"]
except TypeError:
# 如果是类型错误,大概率是没识别到文字
@@ -198,7 +198,7 @@ def mask_photo(img_url, name, id_card_num, color=(255, 255, 255)):
return do_mask, i
# 打开图片
image = image_util.read(img_url)
image, _ = image_util.read(img_url)
if image is None:
return False, image
original_image = image

View File

@@ -23,7 +23,7 @@ def check_error(error_ocr):
image = mask_photo(img_url, name, id_card_num, (0, 0, 0))[1]
final_img_url = ufile.get_private_url(error_ocr.cfjaddress, "drg100")
final_image = image_util.read(final_img_url)
final_image, _ = image_util.read(final_img_url)
return image_util.combined(final_image, image)

View File

@@ -13,14 +13,14 @@ from photo_review import auto_photo_review, SEND_ERROR_EMAIL
# 项目必须从此处启动,否则代码中的相对路径可能导致错误的发生
if __name__ == '__main__':
program_name = '照片审核自动识别脚本'
program_name = "照片审核自动识别脚本"
logging.config.dictConfig(LOGGING_CONFIG)
parser = argparse.ArgumentParser()
parser.add_argument("--clean", default=False, type=bool, help="是否将识别中的案子改为待识别状态")
args = parser.parse_args()
if args.clean:
# 主要用于启动时,清除仍在涂抹中的案子
# 主要用于启动时,清除仍在识别中的案子
session = MysqlSession()
update_flag = (update(ZxPhhd).where(ZxPhhd.exsuccess_flag == "2").values(exsuccess_flag="1"))
session.execute(update_flag)
@@ -34,7 +34,7 @@ if __name__ == '__main__':
logging.info(f"{program_name}】开始运行")
auto_photo_review.main()
except Exception as e:
error_logger = logging.getLogger('error')
error_logger = logging.getLogger("error")
error_logger.error(traceback.format_exc())
if SEND_ERROR_EMAIL:
send_error_email(program_name, repr(e), traceback.format_exc())

View File

@@ -2,9 +2,9 @@ import jieba
from paddlenlp import Taskflow
from paddleocr import PaddleOCR
'''
"""
项目配置
'''
"""
# 每次从数据库获取的案子数量
PHHD_BATCH_SIZE = 10
# 没有查询到案子的等待时间(分钟)
@@ -18,35 +18,35 @@ LAYOUT_ANALYSIS = False
信息抽取关键词配置
"""
# 患者姓名
PATIENT_NAME = ['患者姓名']
PATIENT_NAME = ["患者姓名"]
# 入院日期
ADMISSION_DATE = ['入院日期']
ADMISSION_DATE = ["入院日期"]
# 出院日期
DISCHARGE_DATE = ['出院日期']
DISCHARGE_DATE = ["出院日期"]
# 发生医疗费
MEDICAL_EXPENSES = ['费用总额']
MEDICAL_EXPENSES = ["费用总额"]
# 个人现金支付
PERSONAL_CASH_PAYMENT = ['个人现金支付']
PERSONAL_CASH_PAYMENT = ["个人现金支付"]
# 个人账户支付
PERSONAL_ACCOUNT_PAYMENT = ['个人账户支付']
PERSONAL_ACCOUNT_PAYMENT = ["个人账户支付"]
# 个人自费金额
PERSONAL_FUNDED_AMOUNT = ['自费金额', '个人自费']
PERSONAL_FUNDED_AMOUNT = ["自费金额", "个人自费"]
# 医保类别
MEDICAL_INSURANCE_TYPE = ['医保类型']
MEDICAL_INSURANCE_TYPE = ["医保类型"]
# 就诊医院
HOSPITAL = ['医院']
HOSPITAL = ["医院"]
# 就诊科室
DEPARTMENT = ['科室']
DEPARTMENT = ["科室"]
# 主治医生
DOCTOR = ['主治医生']
DOCTOR = ["主治医生"]
# 住院号
ADMISSION_ID = ['住院号']
ADMISSION_ID = ["住院号"]
# 医保结算单号码
SETTLEMENT_ID = ['医保结算单号码']
SETTLEMENT_ID = ["医保结算单号码"]
# 年龄
AGE = ['年龄']
AGE = ["年龄"]
# 大写总额
UPPERCASE_MEDICAL_EXPENSES = ['大写总额']
UPPERCASE_MEDICAL_EXPENSES = ["大写总额"]
SETTLEMENT_LIST_SCHEMA = \
(PATIENT_NAME + ADMISSION_DATE + DISCHARGE_DATE + MEDICAL_EXPENSES + PERSONAL_CASH_PAYMENT
@@ -58,57 +58,55 @@ DISCHARGE_RECORD_SCHEMA = \
COST_LIST_SCHEMA = PATIENT_NAME + ADMISSION_DATE + DISCHARGE_DATE + MEDICAL_EXPENSES
'''
"""
别名配置
'''
"""
# 使用别名中的value替换key。考虑到效率问题只会替换第一个匹配到的key。
HOSPITAL_ALIAS = {
'沐阳': ['沭阳'],
'连水': ['涟水'],
'唯宁': ['睢宁'], # 雕宁
'九〇四': ['904'],
'漂水': ['溧水'],
"沐阳": ["沭阳"],
"连水": ["涟水"],
"唯宁": ["睢宁"], # 雕宁
"九〇四": ["904"],
"漂水": ["溧水"],
}
DEPARTMENT_ALIAS = {
'耳鼻喉': ['耳鼻咽喉'],
'急症': ['急诊'],
"耳鼻喉": ["耳鼻咽喉"],
"急症": ["急诊"],
}
'''
"""
搜索过滤配置
'''
"""
# 默认会过滤单字
HOSPITAL_FILTER = ['医院', '人民', '第一', '第二', '第三', '大学', '附属']
HOSPITAL_FILTER = ["医院", "人民", "第一", "第二", "第三", "大学", "附属"]
DEPARTMENT_FILTER = ['', '', '西', '']
DEPARTMENT_FILTER = ["", "", "西", ""]
'''
"""
分词配置
'''
jieba.suggest_freq(('肿瘤', '医院'), True)
jieba.suggest_freq(('', ''), True)
jieba.suggest_freq(('感染', ''), True)
jieba.suggest_freq(('', ''), True)
jieba.suggest_freq(('', ''), True)
"""
jieba.suggest_freq(("肿瘤", "医院"), True)
jieba.suggest_freq(("", ""), True)
jieba.suggest_freq(("感染", ""), True)
jieba.suggest_freq(("", ""), True)
jieba.suggest_freq(("", ""), True)
'''
"""
模型配置
'''
SETTLEMENT_IE = Taskflow('information_extraction', schema=SETTLEMENT_LIST_SCHEMA, model='uie-x-base',
task_path='model/settlement_list_model', layout_analysis=LAYOUT_ANALYSIS, precision='fp16')
DISCHARGE_IE = Taskflow('information_extraction', schema=DISCHARGE_RECORD_SCHEMA, model='uie-x-base',
task_path='model/discharge_record_model', layout_analysis=LAYOUT_ANALYSIS, precision='fp16')
COST_IE = Taskflow('information_extraction', schema=COST_LIST_SCHEMA, model='uie-x-base', device_id=1,
task_path='model/cost_list_model', layout_analysis=LAYOUT_ANALYSIS, precision='fp16')
"""
SETTLEMENT_IE = Taskflow("information_extraction", schema=SETTLEMENT_LIST_SCHEMA, model="uie-x-base",
task_path="model/settlement_list_model", layout_analysis=LAYOUT_ANALYSIS, precision="fp16")
DISCHARGE_IE = Taskflow("information_extraction", schema=DISCHARGE_RECORD_SCHEMA, model="uie-x-base",
task_path="model/discharge_record_model", layout_analysis=LAYOUT_ANALYSIS, precision="fp16")
COST_IE = Taskflow("information_extraction", schema=COST_LIST_SCHEMA, model="uie-x-base", device_id=1,
task_path="model/cost_list_model", layout_analysis=LAYOUT_ANALYSIS, precision="fp16")
OCR = PaddleOCR(
gpu_id=1,
use_angle_cls=False,
show_log=False,
det_db_thresh=0.1,
det_db_box_thresh=0.3,
det_limit_side_len=1248,
drop_score=0.3,
rec_model_dir='model/ocr/openatom_rec_repsvtr_ch_infer',
rec_algorithm='SVTR_LCNet',
)
device="gpu:0",
ocr_version="PP-OCRv4",
use_textline_orientation=False,
# 检测像素阈值,输出的概率图中,得分大于该阈值的像素点才会被认为是文字像素点
text_det_thresh=0.1,
# 检测框阈值,检测结果边框内,所有像素点的平均得分大于该阈值时,该结果会被认为是文字区域
text_det_box_thresh=0.3,
)

View File

@@ -36,14 +36,15 @@ def merge_result(result1, result2):
return result1
def ie_temp_image(ie, ocr, image):
def ie_temp_image(ie, ocr, image, is_screenshot=False):
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
cv2.imwrite(temp_file.name, image)
ie_result = []
ocr_pure_text = ''
angle = '0'
try:
layout = util.get_ocr_layout(ocr, temp_file.name)
layout, angle = util.get_ocr_layout(ocr, temp_file.name, is_screenshot)
if not layout:
# 无识别结果
ie_result = []
@@ -61,7 +62,7 @@ def ie_temp_image(ie, ocr, image):
os.remove(temp_file.name)
except Exception as e:
logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e)
return ie_result, ocr_pure_text
return ie_result, ocr_pure_text, angle
# 关键信息提取
@@ -159,7 +160,7 @@ def information_extraction(ie, phrecs, identity):
if not img_path:
continue
image = image_util.read(img_path)
image, exif_data = image_util.read(img_path)
if image is None:
# 图片可能因为某些原因获取不到
continue
@@ -175,7 +176,7 @@ def information_extraction(ie, phrecs, identity):
if text:
info_extract = ie(text)[0]
else:
info_extract = ie_temp_image(ie, OCR, image)[0]
info_extract = ie_temp_image(ie, OCR, image, True)[0]
ie_result = {'result': info_extract, 'angle': '0'}
now = util.get_default_datetime()
@@ -193,27 +194,20 @@ def information_extraction(ie, phrecs, identity):
result = merge_result(result, ie_result['result'])
else:
is_screenshot = image_util.is_screenshot(image, exif_data)
target_images = []
# target_images += detector.request_book_areas(image) # 识别文档区域并裁剪
if not target_images:
target_images.append(image) # 识别失败
angle_count = defaultdict(int, {'0': 0}) # 分割后图片的最优角度统计
for target_image in target_images:
# dewarped_image = dewarp.dewarp_image(target_image) # 去扭曲
dewarped_image = target_image
angles = image_util.parse_rotation_angles(dewarped_image)
split_results = image_util.split(dewarped_image)
split_results = image_util.split(target_image)
for split_result in split_results:
if split_result['img'] is None or split_result['img'].size == 0:
continue
rotated_img = image_util.rotate(split_result['img'], int(angles[0]))
ie_temp_result = ie_temp_image(ie, OCR, rotated_img)
ie_temp_result = ie_temp_image(ie, OCR, split_result['img'], is_screenshot)
ocr_text += ie_temp_result[1]
ie_results = [{'result': ie_temp_result[0], 'angle': angles[0]}]
if not ie_results[0]['result'] or len(ie_results[0]['result']) < len(ie.kwargs.get('schema')):
rotated_img = image_util.rotate(split_result['img'], int(angles[1]))
ie_results.append({'result': ie_temp_image(ie, OCR, rotated_img)[0], 'angle': angles[1]})
ie_results = [{'result': ie_temp_result[0], 'angle': ie_temp_result[2]}]
now = util.get_default_datetime()
best_angle = ['0', 0]
for ie_result in ie_results:

View File

@@ -1,16 +1,11 @@
numpy==1.26.4
onnxconverter-common==1.14.0
aistudio_sdk==0.2.6
onnxconverter-common==1.15.0
onnxruntime-gpu==1.22.0
OpenCC==1.1.6
opencv-python==4.6.0.66
paddle2onnx==1.2.3
paddleclas==2.5.2
paddlenlp==2.6.1
paddleocr==2.7.3
pillow==10.4.0
paddlenlp==3.0.0b4
paddleocr==3.1.1
PyMuPDF==1.26.3
pymysql==1.1.1
requests==2.32.3
sqlacodegen==2.3.0.post1
sqlalchemy==1.4.52
tenacity==8.5.0
ufile==3.2.9
zxing-cpp==2.2.0
ufile==3.2.11
zxing-cpp==2.3.0

View File

@@ -1,9 +1,12 @@
import logging
import math
import urllib.request
from io import BytesIO
import cv2
import numpy
from PIL import Image
from PIL.ExifTags import TAGS
from paddleclas import PaddleClas
from tenacity import retry, stop_after_attempt, wait_random
@@ -14,20 +17,36 @@ def read(image_path):
"""
从网络或本地读取图片
:param image_path: 网络或本地路径
:return: NumPy数组形式的图片
:return: NumPy数组形式的图片, EXIF数据
"""
if image_path.startswith("http"):
# 发送HTTP请求并获取图像数据
resp = urllib.request.urlopen(image_path, timeout=60)
# 将数据读取为字节流
image_data = resp.read()
# 将字节流转换为NumPy数组
image_np = numpy.frombuffer(image_data, numpy.uint8)
# 解码NumPy数组为OpenCV图像格式
image = cv2.imdecode(image_np, cv2.IMREAD_COLOR)
else:
image = cv2.imread(image_path)
return image
with open(image_path, "rb") as f:
image_data = f.read()
# 解析EXIF信息基于原始字节流
exif_data = {}
try:
# 用PIL打开原始字节流
with Image.open(BytesIO(image_data)) as img:
# 获取EXIF字典
exif_info = img._getexif()
if exif_info:
# 将EXIF标签的数字ID转换为可读名称如36867对应"DateTimeOriginal"
for tag_id, value in exif_info.items():
tag_name = TAGS.get(tag_id, tag_id)
exif_data[tag_name] = value
except Exception as e:
logging.error("解析EXIF信息失败", exc_info=e)
# 将字节流转换为NumPy数组
image_np = numpy.frombuffer(image_data, numpy.uint8)
# 解码NumPy数组为OpenCV图像格式
image = cv2.imdecode(image_np, cv2.IMREAD_COLOR)
return image, exif_data
def capture(image, rectangle):
@@ -61,7 +80,7 @@ def split(image, ratio=1.414, overlap=0.05, x_compensation=3):
"""
split_result = []
if isinstance(image, str):
image = read(image)
image, _ = read(image)
height, width = image.shape[:2]
hw_ratio = height / width
wh_ratio = width / height

View File

@@ -12,9 +12,10 @@ def get_default_datetime():
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def get_ocr_layout(ocr, img_path):
def get_ocr_layout(ocr, img_path, is_screenshot=False):
"""
获取ocr识别的结果转为合适的layout形式
:param is_screenshot: 是否是截图
:param ocr: ocr模型
:param img_path: 图片本地路径
:return:
@@ -36,18 +37,18 @@ def get_ocr_layout(ocr, img_path):
return True
layout = []
ocr_result = ocr.ocr(img_path, cls=False)
ocr_result = ocr_result[0]
ocr_result = ocr.predict(input=img_path, use_doc_orientation_classify=not is_screenshot, use_doc_unwarping=not is_screenshot)
ocr_result = next(ocr_result)
if not ocr_result:
return layout
for segment in ocr_result:
box = segment[0]
return layout, "0"
angle = ocr_result.get("doc_preprocessor_res", {}).get("angle", "0")
for i in range(len(ocr_result.get('rec_texts'))):
box = ocr_result.get("rec_polys")[i].tolist()
box = _get_box(box)
if not _normal_box(box):
continue
text = segment[1][0]
layout.append((box, text))
return layout
layout.append((box, ocr_result.get("rec_texts")[i]))
return layout, str(angle)
def delete_temp_file(temp_files):

View File

@@ -24,7 +24,7 @@ def write_visual_result(image, angle=0, layout=None, result=None):
img_name = img[:last_dot_index]
img_type = img[last_dot_index + 1:]
img_array = image_util.read(image)
img_array, _ = image_util.read(image)
if angle != 0:
img_array = image_util.rotate(img_array, angle)
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
@@ -63,7 +63,7 @@ def visual_model_test(model_type, test_img, task_path, schema):
img["y_offset"] -= offset_y
temp_files_paths.append(temp_file.name)
parsed_doc = util.get_ocr_layout(
parsed_doc, _ = util.get_ocr_layout(
PaddleOCR(det_db_box_thresh=0.3, det_db_thresh=0.1, det_limit_side_len=1248, drop_score=0.3,
save_crop_res=False),
temp_file.name)