Files
fcb_photo_review/photo_mask.py
2024-07-11 17:25:36 +08:00

450 lines
16 KiB
Python

import logging.config
import math
import os
import tempfile
import traceback
import urllib.request
from time import sleep
import cv2
import numpy as np
import paddleclas
from paddlenlp.utils.doc_parser import DocParser
from sqlalchemy import update
from auto_email.error_email import send_an_error_email
from config.log import LOGGING_CONFIG
from config.mysql import MysqlSession
from config.photo_review import PHHD_BATCH_SIZE, SLEEP_MINUTES
from config.ucloud import BUCKET
from models import ZxPhrec, ZxPhhd
from ucloud import ucloud
DOC_PARSER = DocParser(use_gpu=True, device_id=1)
def open_image(img_path):
if img_path.startswith("http"):
# 发送HTTP请求并获取图像数据
resp = urllib.request.urlopen(img_path)
# 将数据读取为字节流
image_data = resp.read()
# 将字节流转换为NumPy数组
image_np = np.frombuffer(image_data, np.uint8)
# 解码NumPy数组为OpenCV图像格式
image = cv2.imdecode(image_np, cv2.IMREAD_COLOR)
else:
image = cv2.imread(img_path)
return image
def split_image(img, max_ratio=1.41, best_ration=1.41, overlap=0.05):
split_result = []
# 获取图片的宽度和高度
height, width = img.shape[:2]
# 计算宽高比
ratio = max(width, height) / min(width, height)
# 检查是否需要裁剪
if ratio > max_ratio:
# 确定裁剪的尺寸,保持长宽比,以较短边为基准
new_ratio = best_ration - overlap
if width < height:
# 高度是较长边
cropped_width = width * best_ration
for i in range(math.ceil(height / (width * new_ratio))):
offset = round(width * new_ratio * i)
# 参数形式为[y1:y2, x1:x2]
cropped_img = img[offset:round(offset + cropped_width), 0:width]
split_result.append({"img": cropped_img, "x_offset": 0, "y_offset": offset})
# 最后一次裁剪时不足的部分填充黑色
last_img = split_result[-1]["img"]
split_result[-1]["img"] = cv2.copyMakeBorder(last_img, 0, round(cropped_width - last_img.shape[0]), 0, 0,
cv2.BORDER_CONSTANT, value=(0, 0, 0))
else:
# 宽度是较长边
cropped_height = height * best_ration
for i in range(math.ceil(width / (height * new_ratio))):
offset = round(height * new_ratio * i)
cropped_img = img[0:height, offset:round(offset + cropped_height)]
split_result.append({"img": cropped_img, "x_offset": offset, "y_offset": 0})
# 最后一次裁剪时不足的部分填充黑色
last_img = split_result[-1]["img"]
split_result[-1]["img"] = cv2.copyMakeBorder(last_img, 0, 0, 0, round(cropped_height - last_img.shape[1]),
cv2.BORDER_CONSTANT, value=(0, 0, 0))
else:
split_result.append({"img": img, "x_offset": 0, "y_offset": 0})
return split_result
def capture_image(img, layout):
x1, y1, x2, y2 = layout
return img[int(y1):int(y2), int(x1):int(x2)]
# 获取图片旋转角度
def get_image_rotation_angles(img):
angles = ['0', '90']
model = paddleclas.PaddleClas(model_name="text_image_orientation")
result = model.predict(input_data=img)
try:
result = next(result)[0]
if result["scores"][0] < 0.5:
return angles
angles = result["label_names"]
except Exception as e:
logging.error("获取图片旋转角度失败", exc_info=e)
return angles
def rotate_image(img, angle):
if angle == 0:
return img
height, width, _ = img.shape
if angle == 180:
new_width = width
new_height = height
else:
new_width = height
new_height = width
# 绕图像的中心旋转
# 参数:旋转中心 旋转度数 scale
matrix = cv2.getRotationMatrix2D((width / 2, height / 2), angle, 1)
# 旋转后平移
matrix[0, 2] += (new_width - width) / 2
matrix[1, 2] += (new_height - height) / 2
# 参数:原始图像 旋转参数 元素图像宽高
rotated = cv2.warpAffine(img, matrix, (new_width, new_height))
return rotated
def rotate_rectangle(rectangle, center, angle):
def rotate_point(pt, angle, center):
matrix = cv2.getRotationMatrix2D(center, angle, 1)
if angle != 180:
# 旋转后平移
matrix[0, 2] += center[1] - center[0]
matrix[1, 2] += center[0] - center[1]
reverse_matrix = cv2.invertAffineTransform(matrix)
pt = np.array([[pt[0]], [pt[1]], [1]])
return np.dot(reverse_matrix, pt)
if angle == 0:
return list(rectangle)
x1, y1, x2, y2 = rectangle
# 计算矩形的四个顶点
top_left = (x1, y1)
bot_left = (x1, y2)
top_right = (x2, y1)
bot_right = (x2, y2)
# 旋转矩形的四个顶点
rot_top_left = rotate_point(top_left, angle, center).astype(int)
rot_bot_left = rotate_point(bot_left, angle, center).astype(int)
rot_bot_right = rotate_point(bot_right, angle, center).astype(int)
rot_top_right = rotate_point(top_right, angle, center).astype(int)
# 找出旋转后矩形的新左上角和右下角坐标
new_top_left = (min(rot_top_left[0], rot_bot_left[0], rot_bot_right[0], rot_top_right[0]),
min(rot_top_left[1], rot_bot_left[1], rot_bot_right[1], rot_top_right[1]))
new_bot_right = (max(rot_top_left[0], rot_bot_left[0], rot_bot_right[0], rot_top_right[0]),
max(rot_top_left[1], rot_bot_left[1], rot_bot_right[1], rot_top_right[1]))
return [new_top_left[0], new_top_left[1], new_bot_right[0], new_bot_right[1]]
def get_ocr_layout(ocr, img_path):
def _get_box(old_box):
new_box = [
min(old_box[0][0], old_box[3][0]), # x1
min(old_box[0][1], old_box[1][1]), # y1
max(old_box[1][0], old_box[2][0]), # x2
max(old_box[2][1], old_box[3][1]), # y2
]
return new_box
def _normal_box(box_data):
# Ensure the height and width of bbox are greater than zero
if box_data[3] - box_data[1] < 0 or box_data[2] - box_data[0] < 0:
return False
return True
layout = []
ocr_result = ocr.ocr(img_path, cls=False)
ocr_result = ocr_result[0]
if not ocr_result:
return layout
for segment in ocr_result:
box = segment[0]
box = _get_box(box)
if not _normal_box(box):
continue
text = segment[1][0]
layout.append((box, text))
return layout
def zoom_box(box, ratio):
x1, y1, x2, y2 = box
x1 = round(x1 - x1 * ratio)
y1 = round(y1 - y1 * ratio)
x2 = round(x2 + x2 * ratio)
y2 = round(y2 + y2 * ratio)
return [x1, y1, x2, y2]
def find_box_of_content(content, layout, img_path):
full_box = layout[0]
x_len = full_box[2] - full_box[0]
y_len = full_box[3] - full_box[1]
if x_len >= y_len:
# 横向排布
box_len = x_len
direction = "x"
else:
# 纵向排布
box_len = y_len
direction = "y"
text = layout[1]
text_len = len(text)
char_len = box_len / text_len
index = text.index(content)
if direction == "x":
# 横向排布
box = [
full_box[0] + index * char_len,
full_box[1],
full_box[0] + (index + len(content) + 1) * char_len,
full_box[3],
]
is_abnormal = box[2] - box[0] < (box[3] - box[1]) * len(content) / 2
else:
# 纵向排布
box = [
full_box[0],
full_box[1] + index * char_len,
full_box[2],
full_box[1] + (index + len(content) + 1) * char_len,
]
is_abnormal = box[3] - box[1] < (box[2] - box[0]) * len(content) / 2
if is_abnormal:
# 比例异常,再次识别
image = cv2.imread(img_path)
# 截图时偏大一点
capture_box = zoom_box(box, 0.2)
captured_image = capture_image(image, capture_box)
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file:
cv2.imwrite(temp_file.name, captured_image)
try:
layouts = DOC_PARSER.parse({"doc": temp_file.name})["layout"]
except TypeError:
# 如果是类型错误,大概率是没识别到文字
layouts = []
except Exception as e:
# 如果出现其他错误,抛出
raise e
for layout in layouts:
if content in layout[1]:
temp_box = find_box_of_content(content, layout, temp_file.name)
if temp_box:
box = [
temp_box[0] + capture_box[0],
temp_box[1] + capture_box[1],
temp_box[2] + capture_box[0],
temp_box[3] + capture_box[1],
]
break
try:
os.remove(temp_file.name)
except Exception as e:
logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e)
return box
def find_box_of_value(key, layout, length):
full_box = layout[0]
x_len = full_box[2] - full_box[0]
y_len = full_box[3] - full_box[1]
if x_len >= y_len:
# 横向排布
box_len = x_len
direction = "x"
else:
# 纵向排布
box_len = y_len
direction = "y"
text = layout[1]
text_len = len(text)
char_len = box_len / text_len
index = text.index(key)
if direction == "x":
# 横向排布
return (
full_box[0] + (index + len(key)) * char_len,
full_box[1],
full_box[0] + (index + len(key) + length) * char_len,
full_box[3],
)
else:
# 纵向排布
return (
full_box[0],
full_box[1] + (index + len(key)) * char_len,
full_box[2],
full_box[1] + (index + len(key) + length) * char_len,
)
def get_mask_layout(image, contents):
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
cv2.imwrite(temp_file.name, image)
result = []
try:
# layouts = get_ocr_layout(OCR, temp_file.name)
try:
layouts = DOC_PARSER.parse({"doc": temp_file.name})["layout"]
except TypeError as te:
# 如果是类型错误,大概率是没识别到文字
layouts = []
except Exception as e:
# 如果出现其他错误,抛出
raise e
if not layouts:
# 无识别结果
return result
else:
# 涂抹
for layout in layouts:
for content in contents:
if content in layout[1]:
result.append(find_box_of_content(content, layout, temp_file.name))
if "姓名" in layout[1]:
result.append(find_box_of_value("姓名", layout, 4))
if "交款人" in layout[1]:
result.append(find_box_of_value("交款人", layout, 4))
if "文款人" in layout[1]:
result.append(find_box_of_value("文款人", layout, 4))
if "购买方名称" in layout[1]:
result.append(find_box_of_value("购买方名称", layout, 4))
if "身份证号" in layout[1]:
result.append(find_box_of_value("身份证号", layout, 19))
return result
except Exception as e:
logging.error("涂抹时出错", exc_info=e)
finally:
try:
os.remove(temp_file.name)
except Exception as e:
logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e)
def photo_mask(pk_phhd, contents):
session = MysqlSession()
phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.pk_phhd, ZxPhrec.cfjaddress) \
.filter(ZxPhrec.pk_phhd == pk_phhd) \
.filter(ZxPhrec.cRectype.in_(["3", "4"])) \
.all()
session.close()
for phrec in phrecs:
img_url = ucloud.get_private_url(phrec.cfjaddress)
if not img_url:
continue
# 是否有涂抹
is_masked = False
# 打开图片
image = open_image(img_url)
split_result = split_image(image)
for img in split_result:
angles = get_image_rotation_angles(img["img"])
angle = int(angles[0])
rotated_img = rotate_image(img["img"], angle)
results = get_mask_layout(rotated_img, contents)
if not results:
angle = int(angles[1])
rotated_img = rotate_image(img["img"], angle)
results = get_mask_layout(rotated_img, contents)
if not results and "0" not in angles:
angle = 0
results = get_mask_layout(img["img"], contents)
if results:
is_masked = True
for result in results:
height, width = img["img"].shape[:2]
center = (width / 2, height / 2)
result = rotate_rectangle(result, center, angle)
result = (
result[0] + img["x_offset"],
result[1] + img["y_offset"],
result[2] + img["x_offset"],
result[3] + img["y_offset"],
)
cv2.rectangle(image, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])),
(255, 255, 255), -1, 0)
# 如果涂抹了要备份以及更新
if is_masked:
for i in range(3):
is_copy_success = ucloud.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress)
if is_copy_success:
break
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
cv2.imwrite(temp_file.name, image)
try:
for i in range(3):
is_upload_success = ucloud.upload_file(phrec.cfjaddress, temp_file.name)
if is_upload_success:
break
except Exception as e:
logging.error("上传图片出错", exc_info=e)
finally:
try:
os.remove(temp_file.name)
except Exception as e:
logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e)
if __name__ == '__main__':
logging.config.dictConfig(LOGGING_CONFIG)
try:
while 1:
session = MysqlSession()
phhds = session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm, ZxPhhd.cSfzh).filter(
ZxPhhd.paint_flag == "1"
).limit(PHHD_BATCH_SIZE).all()
# 将状态改为正在涂抹中
pk_phhd_values = [phhd.pk_phhd for phhd in phhds]
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(paint_flag="2"))
session.execute(update_flag)
session.commit()
session.close()
if phhds:
for phhd in phhds:
pk_phhd = phhd.pk_phhd
logging.info(f"开始涂抹:{pk_phhd}")
photo_mask(pk_phhd, [phhd.cXm, phhd.cSfzh])
# 识别完成更新标识
session = MysqlSession()
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(paint_flag="8"))
session.execute(update_flag)
session.commit()
session.close()
else:
# 没有查询到新案子,等待一段时间后再查
log = logging.getLogger()
log.info(f"暂未查询到需要涂抹的案子,等待{SLEEP_MINUTES}分钟...")
sleep(SLEEP_MINUTES * 60)
except Exception as e:
logging.error(traceback.format_exc())
send_an_error_email(program_name='照片涂抹脚本', error_name=repr(e), error_detail=traceback.format_exc())