优化照片涂抹功能架构

This commit is contained in:
2024-07-15 12:18:36 +08:00
parent ab15cb1fc3
commit a0997e0673
6 changed files with 535 additions and 438 deletions

28
photo_mask/__init__.py Normal file
View File

@@ -0,0 +1,28 @@
from paddleocr import PaddleOCR
"""
项目配置
"""
# 每次从数据库获取的案子数量
PHHD_BATCH_SIZE = 20
# 没有查询到案子的等待时间(分钟)
SLEEP_MINUTES = 5
# 是否发送异常提醒邮件
SEND_ERROR_EMAIL = True
# 备份原图的尝试次数
COPY_TRY_TIMES = 3
# 上传新图的尝试次数
UPLOAD_TRY_TIMES = 3
"""
关键词配置
"""
NAME_KEYS = [
{"key": "姓名", "length": 4},
{"key": "交款人", "length": 4},
{"key": "文款人", "length": 4},
{"key": "购买方名称", "length": 4},
]
ID_CARD_NUM_KEYS = [{"key": "身份证号", "length": 19}, ]
OCR = PaddleOCR(use_angle_cls=False, show_log=False, gpu_id=1)

224
photo_mask/photo_mask.py Normal file
View File

@@ -0,0 +1,224 @@
import logging.config
import tempfile
from time import sleep
import cv2
from sqlalchemy import update, and_
from db import MysqlSession
from db.mysql import ZxPhrec, ZxPhhd
from photo_mask import OCR, PHHD_BATCH_SIZE, SLEEP_MINUTES, COPY_TRY_TIMES, UPLOAD_TRY_TIMES, NAME_KEYS, \
ID_CARD_NUM_KEYS
from ucloud import BUCKET, ucloud
from util import image_util, util
def find_box(content, layout, offset=0, length=None, improve=False, image_path=None):
full_box = layout[0]
x_len = full_box[2] - full_box[0]
y_len = full_box[3] - full_box[1]
if x_len >= y_len:
# 横向排布
box_len = x_len
direction = "x"
else:
# 纵向排布
box_len = y_len
direction = "y"
text = layout[1]
text_len = len(text)
char_len = box_len / text_len
index = text.index(content)
if not length:
length = len(content) + 1
if direction == "x":
# 横向排布
box = [
full_box[0] + (index + offset) * char_len,
full_box[1],
full_box[0] + (index + offset + length) * char_len,
full_box[3],
]
else:
# 纵向排布
box = [
full_box[0],
full_box[1] + (index + offset) * char_len,
full_box[2],
full_box[1] + (index + offset + length) * char_len,
]
if improve:
# 再次识别,提高精度
image = cv2.imread(image_path)
# 截图时偏大一点
capture_box = util.zoom_rectangle(box, 0.2)
captured_image = image_util.capture(image, capture_box)
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file:
captured_image, offset_x, offset_y = image_util.expand_to_a4_size(captured_image, True)
cv2.imwrite(temp_file.name, captured_image)
try:
layouts = util.get_ocr_layout(OCR, temp_file.name)
except TypeError:
# 如果是类型错误,大概率是没识别到文字
layouts = []
except Exception as e:
# 如果出现其他错误,抛出
raise e
for layout in layouts:
if content in layout[1]:
temp_box = find_box(content, layout)
if temp_box:
box = [
temp_box[0] + capture_box[0] - offset_x,
temp_box[1] + capture_box[1] - offset_y,
temp_box[2] + capture_box[0] - offset_x,
temp_box[3] + capture_box[1] - offset_y,
]
break
util.delete_temp_file(temp_file.name)
return box
def get_mask_layout(image, name, id_card_num):
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
cv2.imwrite(temp_file.name, image)
result = []
try:
try:
layouts = util.get_ocr_layout(OCR, temp_file.name)
except TypeError:
# 如果是类型错误,大概率是没识别到文字
layouts = []
except Exception as e:
# 如果出现其他错误,抛出
raise e
if not layouts:
# 无识别结果
return result
else:
# 涂抹
for layout in layouts:
find_name_by_key = True
find_id_card_num_by_key = True
if name in layout[1]:
result.append(find_box(name, layout, improve=True, image_path=temp_file.name))
find_name_by_key = False
if id_card_num in layout[1]:
result.append(find_box(id_card_num, layout, improve=True, image_path=temp_file.name))
find_id_card_num_by_key = False
keys = []
if find_name_by_key:
keys += NAME_KEYS
if find_id_card_num_by_key:
keys += ID_CARD_NUM_KEYS
for key in keys:
if key["key"] in layout[1]:
result.append(find_box(key["key"], layout, offset=len(key["key"]), length=key["length"]))
return result
except Exception as e:
logging.error("涂抹时出错!", exc_info=e)
finally:
util.delete_temp_file(temp_file.name)
def photo_mask(pk_phhd, name, id_card_num):
session = MysqlSession()
phrecs = session.query(ZxPhrec.cfjaddress).filter(and_(
ZxPhrec.pk_phhd == pk_phhd,
ZxPhrec.cRectype.in_(["3", "4"])
)).all()
session.close()
for phrec in phrecs:
img_url = ucloud.get_private_url(phrec.cfjaddress)
if not img_url:
continue
# 是否有涂抹
is_masked = False
# 打开图片
image = image_util.read(img_url)
split_results = image_util.split(image)
for split_result in split_results:
angles = image_util.parse_rotation_angles(split_result["img"])
angle = int(angles[0])
rotated_img = image_util.rotate(split_result["img"], angle)
rotated_img, offset_x, offset_y = image_util.expand_to_a4_size(rotated_img, True)
split_result["x_offset"] -= offset_x
split_result["y_offset"] -= offset_y
results = get_mask_layout(rotated_img, name, id_card_num)
if not results:
angle = int(angles[1])
rotated_img = image_util.rotate(split_result["img"], angle)
results = get_mask_layout(rotated_img, name, id_card_num)
if not results and "0" not in angles:
angle = 0
results = get_mask_layout(split_result["img"], name, id_card_num)
if results:
is_masked = True
for result in results:
height, width = split_result["img"].shape[:2]
center = (width / 2, height / 2)
result = image_util.invert_rotate_rectangle(result, center, angle)
result = (
result[0] + split_result["x_offset"],
result[1] + split_result["y_offset"],
result[2] + split_result["x_offset"],
result[3] + split_result["y_offset"],
)
cv2.rectangle(image, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])),
(255, 255, 255), -1, 0)
# 如果涂抹了要备份以及更新
if is_masked:
for i in range(COPY_TRY_TIMES):
is_copy_success = ucloud.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress)
if is_copy_success:
break
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
cv2.imwrite(temp_file.name, image)
try:
for i in range(UPLOAD_TRY_TIMES):
is_upload_success = ucloud.upload_file(phrec.cfjaddress, temp_file.name)
if is_upload_success:
break
except Exception as e:
logging.error("上传图片出错", exc_info=e)
finally:
util.delete_temp_file(temp_file.name)
def main():
while 1:
session = MysqlSession()
phhds = session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm, ZxPhhd.cSfzh).filter(
ZxPhhd.paint_flag == "1"
).limit(PHHD_BATCH_SIZE).all()
# 将状态改为正在涂抹中
pk_phhd_values = [phhd.pk_phhd for phhd in phhds]
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(paint_flag="2"))
session.execute(update_flag)
session.commit()
session.close()
if phhds:
for phhd in phhds:
pk_phhd = phhd.pk_phhd
logging.info(f"开始涂抹:{pk_phhd}")
photo_mask(pk_phhd, phhd.cXm, phhd.cSfzh)
# 识别完成更新标识
session = MysqlSession()
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(paint_flag="8"))
session.execute(update_flag)
session.commit()
session.close()
else:
# 没有查询到新案子,等待一段时间后再查
logging.info(f"暂未查询到需要涂抹的案子,等待{SLEEP_MINUTES}分钟...")
sleep(SLEEP_MINUTES * 60)