Reapply "优化照片涂抹功能架构"

This reverts commit 13575fb2aa.
This commit is contained in:
2024-07-15 15:21:10 +08:00
parent f4a2a38df0
commit 7cbf0c0756
6 changed files with 535 additions and 438 deletions

View File

@@ -1,448 +1,17 @@
import logging.config
import math
import os
import tempfile
import traceback
import urllib.request
from time import sleep
import cv2
import numpy as np
import paddleclas
from paddlenlp.utils.doc_parser import DocParser
from sqlalchemy import update
from auto_email.error_email import send_error_email
from db import MysqlSession
from db.mysql import ZxPhrec, ZxPhhd
from log import LOGGING_CONFIG
from photo_review import PHHD_BATCH_SIZE, SLEEP_MINUTES
from ucloud import BUCKET, ucloud
DOC_PARSER = DocParser(use_gpu=True, device_id=1)
def open_image(img_path):
if img_path.startswith("http"):
# 发送HTTP请求并获取图像数据
resp = urllib.request.urlopen(img_path)
# 将数据读取为字节流
image_data = resp.read()
# 将字节流转换为NumPy数组
image_np = np.frombuffer(image_data, np.uint8)
# 解码NumPy数组为OpenCV图像格式
image = cv2.imdecode(image_np, cv2.IMREAD_COLOR)
else:
image = cv2.imread(img_path)
return image
def split_image(img, max_ratio=1.41, best_ration=1.41, overlap=0.05):
split_result = []
# 获取图片的宽度和高度
height, width = img.shape[:2]
# 计算宽高比
ratio = max(width, height) / min(width, height)
# 检查是否需要裁剪
if ratio > max_ratio:
# 确定裁剪的尺寸,保持长宽比,以较短边为基准
new_ratio = best_ration - overlap
if width < height:
# 高度是较长边
cropped_width = width * best_ration
for i in range(math.ceil(height / (width * new_ratio))):
offset = round(width * new_ratio * i)
# 参数形式为[y1:y2, x1:x2]
cropped_img = img[offset:round(offset + cropped_width), 0:width]
split_result.append({"img": cropped_img, "x_offset": 0, "y_offset": offset})
# 最后一次裁剪时不足的部分填充黑色
last_img = split_result[-1]["img"]
split_result[-1]["img"] = cv2.copyMakeBorder(last_img, 0, round(cropped_width - last_img.shape[0]), 0, 0,
cv2.BORDER_CONSTANT, value=(0, 0, 0))
else:
# 宽度是较长边
cropped_height = height * best_ration
for i in range(math.ceil(width / (height * new_ratio))):
offset = round(height * new_ratio * i)
cropped_img = img[0:height, offset:round(offset + cropped_height)]
split_result.append({"img": cropped_img, "x_offset": offset, "y_offset": 0})
# 最后一次裁剪时不足的部分填充黑色
last_img = split_result[-1]["img"]
split_result[-1]["img"] = cv2.copyMakeBorder(last_img, 0, 0, 0, round(cropped_height - last_img.shape[1]),
cv2.BORDER_CONSTANT, value=(0, 0, 0))
else:
split_result.append({"img": img, "x_offset": 0, "y_offset": 0})
return split_result
def capture_image(img, layout):
x1, y1, x2, y2 = layout
return img[int(y1):int(y2), int(x1):int(x2)]
# 获取图片旋转角度
def get_image_rotation_angles(img):
angles = ['0', '90']
model = paddleclas.PaddleClas(model_name="text_image_orientation")
result = model.predict(input_data=img)
try:
result = next(result)[0]
if result["scores"][0] < 0.5:
return angles
angles = result["label_names"]
except Exception as e:
logging.error("获取图片旋转角度失败", exc_info=e)
return angles
def rotate_image(img, angle):
if angle == 0:
return img
height, width, _ = img.shape
if angle == 180:
new_width = width
new_height = height
else:
new_width = height
new_height = width
# 绕图像的中心旋转
# 参数:旋转中心 旋转度数 scale
matrix = cv2.getRotationMatrix2D((width / 2, height / 2), angle, 1)
# 旋转后平移
matrix[0, 2] += (new_width - width) / 2
matrix[1, 2] += (new_height - height) / 2
# 参数:原始图像 旋转参数 元素图像宽高
rotated = cv2.warpAffine(img, matrix, (new_width, new_height))
return rotated
def rotate_rectangle(rectangle, center, angle):
def rotate_point(pt, angle, center):
matrix = cv2.getRotationMatrix2D(center, angle, 1)
if angle != 180:
# 旋转后平移
matrix[0, 2] += center[1] - center[0]
matrix[1, 2] += center[0] - center[1]
reverse_matrix = cv2.invertAffineTransform(matrix)
pt = np.array([[pt[0]], [pt[1]], [1]])
return np.dot(reverse_matrix, pt)
if angle == 0:
return list(rectangle)
x1, y1, x2, y2 = rectangle
# 计算矩形的四个顶点
top_left = (x1, y1)
bot_left = (x1, y2)
top_right = (x2, y1)
bot_right = (x2, y2)
# 旋转矩形的四个顶点
rot_top_left = rotate_point(top_left, angle, center).astype(int)
rot_bot_left = rotate_point(bot_left, angle, center).astype(int)
rot_bot_right = rotate_point(bot_right, angle, center).astype(int)
rot_top_right = rotate_point(top_right, angle, center).astype(int)
# 找出旋转后矩形的新左上角和右下角坐标
new_top_left = (min(rot_top_left[0], rot_bot_left[0], rot_bot_right[0], rot_top_right[0]),
min(rot_top_left[1], rot_bot_left[1], rot_bot_right[1], rot_top_right[1]))
new_bot_right = (max(rot_top_left[0], rot_bot_left[0], rot_bot_right[0], rot_top_right[0]),
max(rot_top_left[1], rot_bot_left[1], rot_bot_right[1], rot_top_right[1]))
return [new_top_left[0], new_top_left[1], new_bot_right[0], new_bot_right[1]]
def get_ocr_layout(ocr, img_path):
def _get_box(old_box):
new_box = [
min(old_box[0][0], old_box[3][0]), # x1
min(old_box[0][1], old_box[1][1]), # y1
max(old_box[1][0], old_box[2][0]), # x2
max(old_box[2][1], old_box[3][1]), # y2
]
return new_box
def _normal_box(box_data):
# Ensure the height and width of bbox are greater than zero
if box_data[3] - box_data[1] < 0 or box_data[2] - box_data[0] < 0:
return False
return True
layout = []
ocr_result = ocr.ocr(img_path, cls=False)
ocr_result = ocr_result[0]
if not ocr_result:
return layout
for segment in ocr_result:
box = segment[0]
box = _get_box(box)
if not _normal_box(box):
continue
text = segment[1][0]
layout.append((box, text))
return layout
def zoom_box(box, ratio):
x1, y1, x2, y2 = box
x1 = round(x1 - x1 * ratio)
y1 = round(y1 - y1 * ratio)
x2 = round(x2 + x2 * ratio)
y2 = round(y2 + y2 * ratio)
return [x1, y1, x2, y2]
def find_box_of_content(content, layout, img_path, improve=True):
full_box = layout[0]
x_len = full_box[2] - full_box[0]
y_len = full_box[3] - full_box[1]
if x_len >= y_len:
# 横向排布
box_len = x_len
direction = "x"
else:
# 纵向排布
box_len = y_len
direction = "y"
text = layout[1]
text_len = len(text)
char_len = box_len / text_len
index = text.index(content)
if direction == "x":
# 横向排布
box = [
full_box[0] + index * char_len,
full_box[1],
full_box[0] + (index + len(content) + 1) * char_len,
full_box[3],
]
is_abnormal = box[2] - box[0] < (box[3] - box[1]) * len(content) / 2
else:
# 纵向排布
box = [
full_box[0],
full_box[1] + index * char_len,
full_box[2],
full_box[1] + (index + len(content) + 1) * char_len,
]
is_abnormal = box[3] - box[1] < (box[2] - box[0]) * len(content) / 2
if is_abnormal and improve:
# 比例异常,再次识别
image = cv2.imread(img_path)
# 截图时偏大一点
capture_box = zoom_box(box, 0.2)
captured_image = capture_image(image, capture_box)
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file:
cv2.imwrite(temp_file.name, captured_image)
try:
layouts = DOC_PARSER.parse({"doc": temp_file.name})["layout"]
except TypeError:
# 如果是类型错误,大概率是没识别到文字
layouts = []
except Exception as e:
# 如果出现其他错误,抛出
raise e
for layout in layouts:
if content in layout[1]:
temp_box = find_box_of_content(content, layout, temp_file.name, False)
if temp_box:
box = [
temp_box[0] + capture_box[0],
temp_box[1] + capture_box[1],
temp_box[2] + capture_box[0],
temp_box[3] + capture_box[1],
]
break
try:
os.remove(temp_file.name)
except Exception as e:
logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e)
return box
def find_box_of_value(key, layout, length):
full_box = layout[0]
x_len = full_box[2] - full_box[0]
y_len = full_box[3] - full_box[1]
if x_len >= y_len:
# 横向排布
box_len = x_len
direction = "x"
else:
# 纵向排布
box_len = y_len
direction = "y"
text = layout[1]
text_len = len(text)
char_len = box_len / text_len
index = text.index(key)
if direction == "x":
# 横向排布
return (
full_box[0] + (index + len(key)) * char_len,
full_box[1],
full_box[0] + (index + len(key) + length) * char_len,
full_box[3],
)
else:
# 纵向排布
return (
full_box[0],
full_box[1] + (index + len(key)) * char_len,
full_box[2],
full_box[1] + (index + len(key) + length) * char_len,
)
def get_mask_layout(image, contents):
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
cv2.imwrite(temp_file.name, image)
result = []
try:
# layouts = get_ocr_layout(OCR, temp_file.name)
try:
layouts = DOC_PARSER.parse({"doc": temp_file.name}, True)["layout"]
except TypeError as te:
# 如果是类型错误,大概率是没识别到文字
layouts = []
except Exception as e:
# 如果出现其他错误,抛出
raise e
if not layouts:
# 无识别结果
return result
else:
# 涂抹
for layout in layouts:
for content in contents:
if content in layout[1]:
result.append(find_box_of_content(content, layout, temp_file.name))
if "姓名" in layout[1]:
result.append(find_box_of_value("姓名", layout, 4))
if "交款人" in layout[1]:
result.append(find_box_of_value("交款人", layout, 4))
if "文款人" in layout[1]:
result.append(find_box_of_value("文款人", layout, 4))
if "购买方名称" in layout[1]:
result.append(find_box_of_value("购买方名称", layout, 4))
if "身份证号" in layout[1]:
result.append(find_box_of_value("身份证号", layout, 19))
return result
except Exception as e:
logging.error("涂抹时出错", exc_info=e)
finally:
try:
os.remove(temp_file.name)
except Exception as e:
logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e)
def photo_mask(pk_phhd, contents):
session = MysqlSession()
phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.pk_phhd, ZxPhrec.cfjaddress) \
.filter(ZxPhrec.pk_phhd == pk_phhd) \
.filter(ZxPhrec.cRectype.in_(["3", "4"])) \
.all()
session.close()
for phrec in phrecs:
img_url = ucloud.get_private_url(phrec.cfjaddress)
if not img_url:
continue
# 是否有涂抹
is_masked = False
# 打开图片
image = open_image(img_url)
split_result = split_image(image)
for img in split_result:
angles = get_image_rotation_angles(img["img"])
angle = int(angles[0])
rotated_img = rotate_image(img["img"], angle)
results = get_mask_layout(rotated_img, contents)
if not results:
angle = int(angles[1])
rotated_img = rotate_image(img["img"], angle)
results = get_mask_layout(rotated_img, contents)
if not results and "0" not in angles:
angle = 0
results = get_mask_layout(img["img"], contents)
if results:
is_masked = True
for result in results:
height, width = img["img"].shape[:2]
center = (width / 2, height / 2)
result = rotate_rectangle(result, center, angle)
result = (
result[0] + img["x_offset"],
result[1] + img["y_offset"],
result[2] + img["x_offset"],
result[3] + img["y_offset"],
)
cv2.rectangle(image, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])),
(255, 255, 255), -1, 0)
# 如果涂抹了要备份以及更新
if is_masked:
for i in range(3):
is_copy_success = ucloud.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress)
if is_copy_success:
break
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
cv2.imwrite(temp_file.name, image)
try:
for i in range(3):
is_upload_success = ucloud.upload_file(phrec.cfjaddress, temp_file.name)
if is_upload_success:
break
except Exception as e:
logging.error("上传图片出错", exc_info=e)
finally:
try:
os.remove(temp_file.name)
except Exception as e:
logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e)
from photo_mask import photo_mask, SEND_ERROR_EMAIL
if __name__ == '__main__':
program_name = "自动照片涂抹脱敏脚本"
logging.config.dictConfig(LOGGING_CONFIG)
try:
while 1:
session = MysqlSession()
phhds = session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm, ZxPhhd.cSfzh).filter(
ZxPhhd.paint_flag == "1"
).limit(PHHD_BATCH_SIZE).all()
# 将状态改为正在涂抹中
pk_phhd_values = [phhd.pk_phhd for phhd in phhds]
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(paint_flag="2"))
session.execute(update_flag)
session.commit()
session.close()
if phhds:
for phhd in phhds:
pk_phhd = phhd.pk_phhd
logging.info(f"开始涂抹:{pk_phhd}")
photo_mask(pk_phhd, [phhd.cXm, phhd.cSfzh])
# 识别完成更新标识
session = MysqlSession()
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(paint_flag="8"))
session.execute(update_flag)
session.commit()
session.close()
else:
# 没有查询到新案子,等待一段时间后再查
log = logging.getLogger()
log.info(f"暂未查询到需要涂抹的案子,等待{SLEEP_MINUTES}分钟...")
sleep(SLEEP_MINUTES * 60)
logging.info(f"{program_name}】开始运行")
photo_mask.main()
except Exception as e:
logging.error(traceback.format_exc())
send_error_email(program_name='照片涂抹脚本', error_name=repr(e), error_detail=traceback.format_exc())
if SEND_ERROR_EMAIL:
send_error_email(program_name=program_name, error_name=repr(e), error_detail=traceback.format_exc())