443
photo_mask.py
443
photo_mask.py
@@ -1,17 +1,448 @@
|
||||
import logging.config
|
||||
import math
|
||||
import os
|
||||
import tempfile
|
||||
import traceback
|
||||
import urllib.request
|
||||
from time import sleep
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import paddleclas
|
||||
from paddlenlp.utils.doc_parser import DocParser
|
||||
from sqlalchemy import update
|
||||
|
||||
from auto_email.error_email import send_error_email
|
||||
from db import MysqlSession
|
||||
from db.mysql import ZxPhrec, ZxPhhd
|
||||
from log import LOGGING_CONFIG
|
||||
from photo_mask import photo_mask, SEND_ERROR_EMAIL
|
||||
from photo_review import PHHD_BATCH_SIZE, SLEEP_MINUTES
|
||||
from ucloud import BUCKET, ucloud
|
||||
|
||||
DOC_PARSER = DocParser(use_gpu=True, device_id=1)
|
||||
|
||||
|
||||
def open_image(img_path):
|
||||
if img_path.startswith("http"):
|
||||
# 发送HTTP请求并获取图像数据
|
||||
resp = urllib.request.urlopen(img_path)
|
||||
# 将数据读取为字节流
|
||||
image_data = resp.read()
|
||||
# 将字节流转换为NumPy数组
|
||||
image_np = np.frombuffer(image_data, np.uint8)
|
||||
# 解码NumPy数组为OpenCV图像格式
|
||||
image = cv2.imdecode(image_np, cv2.IMREAD_COLOR)
|
||||
else:
|
||||
image = cv2.imread(img_path)
|
||||
return image
|
||||
|
||||
|
||||
def split_image(img, max_ratio=1.41, best_ration=1.41, overlap=0.05):
|
||||
split_result = []
|
||||
# 获取图片的宽度和高度
|
||||
height, width = img.shape[:2]
|
||||
# 计算宽高比
|
||||
ratio = max(width, height) / min(width, height)
|
||||
# 检查是否需要裁剪
|
||||
if ratio > max_ratio:
|
||||
# 确定裁剪的尺寸,保持长宽比,以较短边为基准
|
||||
new_ratio = best_ration - overlap
|
||||
if width < height:
|
||||
# 高度是较长边
|
||||
cropped_width = width * best_ration
|
||||
for i in range(math.ceil(height / (width * new_ratio))):
|
||||
offset = round(width * new_ratio * i)
|
||||
# 参数形式为[y1:y2, x1:x2]
|
||||
cropped_img = img[offset:round(offset + cropped_width), 0:width]
|
||||
split_result.append({"img": cropped_img, "x_offset": 0, "y_offset": offset})
|
||||
# 最后一次裁剪时不足的部分填充黑色
|
||||
last_img = split_result[-1]["img"]
|
||||
split_result[-1]["img"] = cv2.copyMakeBorder(last_img, 0, round(cropped_width - last_img.shape[0]), 0, 0,
|
||||
cv2.BORDER_CONSTANT, value=(0, 0, 0))
|
||||
else:
|
||||
# 宽度是较长边
|
||||
cropped_height = height * best_ration
|
||||
for i in range(math.ceil(width / (height * new_ratio))):
|
||||
offset = round(height * new_ratio * i)
|
||||
cropped_img = img[0:height, offset:round(offset + cropped_height)]
|
||||
split_result.append({"img": cropped_img, "x_offset": offset, "y_offset": 0})
|
||||
# 最后一次裁剪时不足的部分填充黑色
|
||||
last_img = split_result[-1]["img"]
|
||||
split_result[-1]["img"] = cv2.copyMakeBorder(last_img, 0, 0, 0, round(cropped_height - last_img.shape[1]),
|
||||
cv2.BORDER_CONSTANT, value=(0, 0, 0))
|
||||
else:
|
||||
split_result.append({"img": img, "x_offset": 0, "y_offset": 0})
|
||||
return split_result
|
||||
|
||||
|
||||
def capture_image(img, layout):
|
||||
x1, y1, x2, y2 = layout
|
||||
return img[int(y1):int(y2), int(x1):int(x2)]
|
||||
|
||||
|
||||
# 获取图片旋转角度
|
||||
def get_image_rotation_angles(img):
|
||||
angles = ['0', '90']
|
||||
model = paddleclas.PaddleClas(model_name="text_image_orientation")
|
||||
result = model.predict(input_data=img)
|
||||
try:
|
||||
result = next(result)[0]
|
||||
if result["scores"][0] < 0.5:
|
||||
return angles
|
||||
angles = result["label_names"]
|
||||
except Exception as e:
|
||||
logging.error("获取图片旋转角度失败", exc_info=e)
|
||||
return angles
|
||||
|
||||
|
||||
def rotate_image(img, angle):
|
||||
if angle == 0:
|
||||
return img
|
||||
height, width, _ = img.shape
|
||||
if angle == 180:
|
||||
new_width = width
|
||||
new_height = height
|
||||
else:
|
||||
new_width = height
|
||||
new_height = width
|
||||
# 绕图像的中心旋转
|
||||
# 参数:旋转中心 旋转度数 scale
|
||||
matrix = cv2.getRotationMatrix2D((width / 2, height / 2), angle, 1)
|
||||
# 旋转后平移
|
||||
matrix[0, 2] += (new_width - width) / 2
|
||||
matrix[1, 2] += (new_height - height) / 2
|
||||
# 参数:原始图像 旋转参数 元素图像宽高
|
||||
rotated = cv2.warpAffine(img, matrix, (new_width, new_height))
|
||||
return rotated
|
||||
|
||||
|
||||
def rotate_rectangle(rectangle, center, angle):
|
||||
def rotate_point(pt, angle, center):
|
||||
matrix = cv2.getRotationMatrix2D(center, angle, 1)
|
||||
if angle != 180:
|
||||
# 旋转后平移
|
||||
matrix[0, 2] += center[1] - center[0]
|
||||
matrix[1, 2] += center[0] - center[1]
|
||||
|
||||
reverse_matrix = cv2.invertAffineTransform(matrix)
|
||||
|
||||
pt = np.array([[pt[0]], [pt[1]], [1]])
|
||||
return np.dot(reverse_matrix, pt)
|
||||
|
||||
if angle == 0:
|
||||
return list(rectangle)
|
||||
|
||||
x1, y1, x2, y2 = rectangle
|
||||
|
||||
# 计算矩形的四个顶点
|
||||
top_left = (x1, y1)
|
||||
bot_left = (x1, y2)
|
||||
top_right = (x2, y1)
|
||||
bot_right = (x2, y2)
|
||||
|
||||
# 旋转矩形的四个顶点
|
||||
rot_top_left = rotate_point(top_left, angle, center).astype(int)
|
||||
rot_bot_left = rotate_point(bot_left, angle, center).astype(int)
|
||||
rot_bot_right = rotate_point(bot_right, angle, center).astype(int)
|
||||
rot_top_right = rotate_point(top_right, angle, center).astype(int)
|
||||
|
||||
# 找出旋转后矩形的新左上角和右下角坐标
|
||||
new_top_left = (min(rot_top_left[0], rot_bot_left[0], rot_bot_right[0], rot_top_right[0]),
|
||||
min(rot_top_left[1], rot_bot_left[1], rot_bot_right[1], rot_top_right[1]))
|
||||
new_bot_right = (max(rot_top_left[0], rot_bot_left[0], rot_bot_right[0], rot_top_right[0]),
|
||||
max(rot_top_left[1], rot_bot_left[1], rot_bot_right[1], rot_top_right[1]))
|
||||
|
||||
return [new_top_left[0], new_top_left[1], new_bot_right[0], new_bot_right[1]]
|
||||
|
||||
|
||||
def get_ocr_layout(ocr, img_path):
|
||||
def _get_box(old_box):
|
||||
new_box = [
|
||||
min(old_box[0][0], old_box[3][0]), # x1
|
||||
min(old_box[0][1], old_box[1][1]), # y1
|
||||
max(old_box[1][0], old_box[2][0]), # x2
|
||||
max(old_box[2][1], old_box[3][1]), # y2
|
||||
]
|
||||
return new_box
|
||||
|
||||
def _normal_box(box_data):
|
||||
# Ensure the height and width of bbox are greater than zero
|
||||
if box_data[3] - box_data[1] < 0 or box_data[2] - box_data[0] < 0:
|
||||
return False
|
||||
return True
|
||||
|
||||
layout = []
|
||||
ocr_result = ocr.ocr(img_path, cls=False)
|
||||
ocr_result = ocr_result[0]
|
||||
if not ocr_result:
|
||||
return layout
|
||||
for segment in ocr_result:
|
||||
box = segment[0]
|
||||
box = _get_box(box)
|
||||
if not _normal_box(box):
|
||||
continue
|
||||
text = segment[1][0]
|
||||
layout.append((box, text))
|
||||
return layout
|
||||
|
||||
|
||||
def zoom_box(box, ratio):
|
||||
x1, y1, x2, y2 = box
|
||||
x1 = round(x1 - x1 * ratio)
|
||||
y1 = round(y1 - y1 * ratio)
|
||||
x2 = round(x2 + x2 * ratio)
|
||||
y2 = round(y2 + y2 * ratio)
|
||||
return [x1, y1, x2, y2]
|
||||
|
||||
|
||||
def find_box_of_content(content, layout, img_path, improve=True):
|
||||
full_box = layout[0]
|
||||
x_len = full_box[2] - full_box[0]
|
||||
y_len = full_box[3] - full_box[1]
|
||||
if x_len >= y_len:
|
||||
# 横向排布
|
||||
box_len = x_len
|
||||
direction = "x"
|
||||
else:
|
||||
# 纵向排布
|
||||
box_len = y_len
|
||||
direction = "y"
|
||||
text = layout[1]
|
||||
text_len = len(text)
|
||||
char_len = box_len / text_len
|
||||
index = text.index(content)
|
||||
|
||||
if direction == "x":
|
||||
# 横向排布
|
||||
box = [
|
||||
full_box[0] + index * char_len,
|
||||
full_box[1],
|
||||
full_box[0] + (index + len(content) + 1) * char_len,
|
||||
full_box[3],
|
||||
]
|
||||
is_abnormal = box[2] - box[0] < (box[3] - box[1]) * len(content) / 2
|
||||
else:
|
||||
# 纵向排布
|
||||
box = [
|
||||
full_box[0],
|
||||
full_box[1] + index * char_len,
|
||||
full_box[2],
|
||||
full_box[1] + (index + len(content) + 1) * char_len,
|
||||
]
|
||||
is_abnormal = box[3] - box[1] < (box[2] - box[0]) * len(content) / 2
|
||||
|
||||
if is_abnormal and improve:
|
||||
# 比例异常,再次识别
|
||||
image = cv2.imread(img_path)
|
||||
# 截图时偏大一点
|
||||
capture_box = zoom_box(box, 0.2)
|
||||
captured_image = capture_image(image, capture_box)
|
||||
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file:
|
||||
cv2.imwrite(temp_file.name, captured_image)
|
||||
try:
|
||||
layouts = DOC_PARSER.parse({"doc": temp_file.name})["layout"]
|
||||
except TypeError:
|
||||
# 如果是类型错误,大概率是没识别到文字
|
||||
layouts = []
|
||||
except Exception as e:
|
||||
# 如果出现其他错误,抛出
|
||||
raise e
|
||||
for layout in layouts:
|
||||
if content in layout[1]:
|
||||
temp_box = find_box_of_content(content, layout, temp_file.name, False)
|
||||
if temp_box:
|
||||
box = [
|
||||
temp_box[0] + capture_box[0],
|
||||
temp_box[1] + capture_box[1],
|
||||
temp_box[2] + capture_box[0],
|
||||
temp_box[3] + capture_box[1],
|
||||
]
|
||||
break
|
||||
try:
|
||||
os.remove(temp_file.name)
|
||||
except Exception as e:
|
||||
logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e)
|
||||
return box
|
||||
|
||||
|
||||
def find_box_of_value(key, layout, length):
|
||||
full_box = layout[0]
|
||||
x_len = full_box[2] - full_box[0]
|
||||
y_len = full_box[3] - full_box[1]
|
||||
if x_len >= y_len:
|
||||
# 横向排布
|
||||
box_len = x_len
|
||||
direction = "x"
|
||||
else:
|
||||
# 纵向排布
|
||||
box_len = y_len
|
||||
direction = "y"
|
||||
text = layout[1]
|
||||
text_len = len(text)
|
||||
char_len = box_len / text_len
|
||||
index = text.index(key)
|
||||
|
||||
if direction == "x":
|
||||
# 横向排布
|
||||
return (
|
||||
full_box[0] + (index + len(key)) * char_len,
|
||||
full_box[1],
|
||||
full_box[0] + (index + len(key) + length) * char_len,
|
||||
full_box[3],
|
||||
)
|
||||
else:
|
||||
# 纵向排布
|
||||
return (
|
||||
full_box[0],
|
||||
full_box[1] + (index + len(key)) * char_len,
|
||||
full_box[2],
|
||||
full_box[1] + (index + len(key) + length) * char_len,
|
||||
)
|
||||
|
||||
|
||||
def get_mask_layout(image, contents):
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
|
||||
cv2.imwrite(temp_file.name, image)
|
||||
|
||||
result = []
|
||||
try:
|
||||
# layouts = get_ocr_layout(OCR, temp_file.name)
|
||||
try:
|
||||
layouts = DOC_PARSER.parse({"doc": temp_file.name}, True)["layout"]
|
||||
except TypeError as te:
|
||||
# 如果是类型错误,大概率是没识别到文字
|
||||
layouts = []
|
||||
except Exception as e:
|
||||
# 如果出现其他错误,抛出
|
||||
raise e
|
||||
if not layouts:
|
||||
# 无识别结果
|
||||
return result
|
||||
else:
|
||||
# 涂抹
|
||||
for layout in layouts:
|
||||
for content in contents:
|
||||
if content in layout[1]:
|
||||
result.append(find_box_of_content(content, layout, temp_file.name))
|
||||
if "姓名" in layout[1]:
|
||||
result.append(find_box_of_value("姓名", layout, 4))
|
||||
if "交款人" in layout[1]:
|
||||
result.append(find_box_of_value("交款人", layout, 4))
|
||||
if "文款人" in layout[1]:
|
||||
result.append(find_box_of_value("文款人", layout, 4))
|
||||
if "购买方名称" in layout[1]:
|
||||
result.append(find_box_of_value("购买方名称", layout, 4))
|
||||
if "身份证号" in layout[1]:
|
||||
result.append(find_box_of_value("身份证号", layout, 19))
|
||||
return result
|
||||
except Exception as e:
|
||||
logging.error("涂抹时出错", exc_info=e)
|
||||
finally:
|
||||
try:
|
||||
os.remove(temp_file.name)
|
||||
except Exception as e:
|
||||
logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e)
|
||||
|
||||
|
||||
def photo_mask(pk_phhd, contents):
|
||||
session = MysqlSession()
|
||||
phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.pk_phhd, ZxPhrec.cfjaddress) \
|
||||
.filter(ZxPhrec.pk_phhd == pk_phhd) \
|
||||
.filter(ZxPhrec.cRectype.in_(["3", "4"])) \
|
||||
.all()
|
||||
session.close()
|
||||
for phrec in phrecs:
|
||||
img_url = ucloud.get_private_url(phrec.cfjaddress)
|
||||
if not img_url:
|
||||
continue
|
||||
# 是否有涂抹
|
||||
is_masked = False
|
||||
# 打开图片
|
||||
image = open_image(img_url)
|
||||
split_result = split_image(image)
|
||||
for img in split_result:
|
||||
angles = get_image_rotation_angles(img["img"])
|
||||
angle = int(angles[0])
|
||||
rotated_img = rotate_image(img["img"], angle)
|
||||
results = get_mask_layout(rotated_img, contents)
|
||||
if not results:
|
||||
angle = int(angles[1])
|
||||
rotated_img = rotate_image(img["img"], angle)
|
||||
results = get_mask_layout(rotated_img, contents)
|
||||
if not results and "0" not in angles:
|
||||
angle = 0
|
||||
results = get_mask_layout(img["img"], contents)
|
||||
|
||||
if results:
|
||||
is_masked = True
|
||||
|
||||
for result in results:
|
||||
height, width = img["img"].shape[:2]
|
||||
center = (width / 2, height / 2)
|
||||
result = rotate_rectangle(result, center, angle)
|
||||
result = (
|
||||
result[0] + img["x_offset"],
|
||||
result[1] + img["y_offset"],
|
||||
result[2] + img["x_offset"],
|
||||
result[3] + img["y_offset"],
|
||||
)
|
||||
cv2.rectangle(image, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])),
|
||||
(255, 255, 255), -1, 0)
|
||||
|
||||
# 如果涂抹了要备份以及更新
|
||||
if is_masked:
|
||||
for i in range(3):
|
||||
is_copy_success = ucloud.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress)
|
||||
if is_copy_success:
|
||||
break
|
||||
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
|
||||
cv2.imwrite(temp_file.name, image)
|
||||
try:
|
||||
for i in range(3):
|
||||
is_upload_success = ucloud.upload_file(phrec.cfjaddress, temp_file.name)
|
||||
if is_upload_success:
|
||||
break
|
||||
except Exception as e:
|
||||
logging.error("上传图片出错", exc_info=e)
|
||||
finally:
|
||||
try:
|
||||
os.remove(temp_file.name)
|
||||
except Exception as e:
|
||||
logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
program_name = "自动照片涂抹脱敏脚本"
|
||||
logging.config.dictConfig(LOGGING_CONFIG)
|
||||
try:
|
||||
logging.info(f"【{program_name}】开始运行")
|
||||
photo_mask.main()
|
||||
while 1:
|
||||
session = MysqlSession()
|
||||
phhds = session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm, ZxPhhd.cSfzh).filter(
|
||||
ZxPhhd.paint_flag == "1"
|
||||
).limit(PHHD_BATCH_SIZE).all()
|
||||
# 将状态改为正在涂抹中
|
||||
pk_phhd_values = [phhd.pk_phhd for phhd in phhds]
|
||||
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(paint_flag="2"))
|
||||
session.execute(update_flag)
|
||||
session.commit()
|
||||
session.close()
|
||||
if phhds:
|
||||
for phhd in phhds:
|
||||
pk_phhd = phhd.pk_phhd
|
||||
logging.info(f"开始涂抹:{pk_phhd}")
|
||||
photo_mask(pk_phhd, [phhd.cXm, phhd.cSfzh])
|
||||
|
||||
# 识别完成更新标识
|
||||
session = MysqlSession()
|
||||
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(paint_flag="8"))
|
||||
session.execute(update_flag)
|
||||
session.commit()
|
||||
session.close()
|
||||
else:
|
||||
# 没有查询到新案子,等待一段时间后再查
|
||||
log = logging.getLogger()
|
||||
log.info(f"暂未查询到需要涂抹的案子,等待{SLEEP_MINUTES}分钟...")
|
||||
sleep(SLEEP_MINUTES * 60)
|
||||
except Exception as e:
|
||||
logging.error(traceback.format_exc())
|
||||
if SEND_ERROR_EMAIL:
|
||||
send_error_email(program_name=program_name, error_name=repr(e), error_detail=traceback.format_exc())
|
||||
send_error_email(program_name='照片涂抹脚本', error_name=repr(e), error_detail=traceback.format_exc())
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
from paddleocr import PaddleOCR
|
||||
|
||||
"""
|
||||
项目配置
|
||||
"""
|
||||
# 每次从数据库获取的案子数量
|
||||
PHHD_BATCH_SIZE = 20
|
||||
# 没有查询到案子的等待时间(分钟)
|
||||
SLEEP_MINUTES = 5
|
||||
# 是否发送异常提醒邮件
|
||||
SEND_ERROR_EMAIL = True
|
||||
# 备份原图的尝试次数
|
||||
COPY_TRY_TIMES = 3
|
||||
# 上传新图的尝试次数
|
||||
UPLOAD_TRY_TIMES = 3
|
||||
|
||||
"""
|
||||
关键词配置
|
||||
"""
|
||||
NAME_KEYS = [
|
||||
{"key": "姓名", "length": 4},
|
||||
{"key": "交款人", "length": 4},
|
||||
{"key": "文款人", "length": 4},
|
||||
{"key": "购买方名称", "length": 4},
|
||||
]
|
||||
ID_CARD_NUM_KEYS = [{"key": "身份证号", "length": 19}, ]
|
||||
|
||||
OCR = PaddleOCR(use_angle_cls=False, show_log=False, gpu_id=1)
|
||||
@@ -1,224 +0,0 @@
|
||||
import logging.config
|
||||
import tempfile
|
||||
from time import sleep
|
||||
|
||||
import cv2
|
||||
from sqlalchemy import update, and_
|
||||
|
||||
from db import MysqlSession
|
||||
from db.mysql import ZxPhrec, ZxPhhd
|
||||
from photo_mask import OCR, PHHD_BATCH_SIZE, SLEEP_MINUTES, COPY_TRY_TIMES, UPLOAD_TRY_TIMES, NAME_KEYS, \
|
||||
ID_CARD_NUM_KEYS
|
||||
from ucloud import BUCKET, ucloud
|
||||
from util import image_util, util
|
||||
|
||||
|
||||
def find_box(content, layout, offset=0, length=None, improve=False, image_path=None):
|
||||
full_box = layout[0]
|
||||
x_len = full_box[2] - full_box[0]
|
||||
y_len = full_box[3] - full_box[1]
|
||||
if x_len >= y_len:
|
||||
# 横向排布
|
||||
box_len = x_len
|
||||
direction = "x"
|
||||
else:
|
||||
# 纵向排布
|
||||
box_len = y_len
|
||||
direction = "y"
|
||||
text = layout[1]
|
||||
text_len = len(text)
|
||||
char_len = box_len / text_len
|
||||
index = text.index(content)
|
||||
|
||||
if not length:
|
||||
length = len(content) + 1
|
||||
if direction == "x":
|
||||
# 横向排布
|
||||
box = [
|
||||
full_box[0] + (index + offset) * char_len,
|
||||
full_box[1],
|
||||
full_box[0] + (index + offset + length) * char_len,
|
||||
full_box[3],
|
||||
]
|
||||
else:
|
||||
# 纵向排布
|
||||
box = [
|
||||
full_box[0],
|
||||
full_box[1] + (index + offset) * char_len,
|
||||
full_box[2],
|
||||
full_box[1] + (index + offset + length) * char_len,
|
||||
]
|
||||
|
||||
if improve:
|
||||
# 再次识别,提高精度
|
||||
image = cv2.imread(image_path)
|
||||
# 截图时偏大一点
|
||||
capture_box = util.zoom_rectangle(box, 0.2)
|
||||
captured_image = image_util.capture(image, capture_box)
|
||||
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file:
|
||||
captured_image, offset_x, offset_y = image_util.expand_to_a4_size(captured_image, True)
|
||||
cv2.imwrite(temp_file.name, captured_image)
|
||||
try:
|
||||
layouts = util.get_ocr_layout(OCR, temp_file.name)
|
||||
except TypeError:
|
||||
# 如果是类型错误,大概率是没识别到文字
|
||||
layouts = []
|
||||
except Exception as e:
|
||||
# 如果出现其他错误,抛出
|
||||
raise e
|
||||
for layout in layouts:
|
||||
if content in layout[1]:
|
||||
temp_box = find_box(content, layout)
|
||||
if temp_box:
|
||||
box = [
|
||||
temp_box[0] + capture_box[0] - offset_x,
|
||||
temp_box[1] + capture_box[1] - offset_y,
|
||||
temp_box[2] + capture_box[0] - offset_x,
|
||||
temp_box[3] + capture_box[1] - offset_y,
|
||||
]
|
||||
break
|
||||
util.delete_temp_file(temp_file.name)
|
||||
return box
|
||||
|
||||
|
||||
def get_mask_layout(image, name, id_card_num):
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
|
||||
cv2.imwrite(temp_file.name, image)
|
||||
|
||||
result = []
|
||||
try:
|
||||
try:
|
||||
layouts = util.get_ocr_layout(OCR, temp_file.name)
|
||||
except TypeError:
|
||||
# 如果是类型错误,大概率是没识别到文字
|
||||
layouts = []
|
||||
except Exception as e:
|
||||
# 如果出现其他错误,抛出
|
||||
raise e
|
||||
|
||||
if not layouts:
|
||||
# 无识别结果
|
||||
return result
|
||||
else:
|
||||
# 涂抹
|
||||
for layout in layouts:
|
||||
find_name_by_key = True
|
||||
find_id_card_num_by_key = True
|
||||
if name in layout[1]:
|
||||
result.append(find_box(name, layout, improve=True, image_path=temp_file.name))
|
||||
find_name_by_key = False
|
||||
if id_card_num in layout[1]:
|
||||
result.append(find_box(id_card_num, layout, improve=True, image_path=temp_file.name))
|
||||
find_id_card_num_by_key = False
|
||||
|
||||
keys = []
|
||||
if find_name_by_key:
|
||||
keys += NAME_KEYS
|
||||
if find_id_card_num_by_key:
|
||||
keys += ID_CARD_NUM_KEYS
|
||||
for key in keys:
|
||||
if key["key"] in layout[1]:
|
||||
result.append(find_box(key["key"], layout, offset=len(key["key"]), length=key["length"]))
|
||||
return result
|
||||
except Exception as e:
|
||||
logging.error("涂抹时出错!", exc_info=e)
|
||||
finally:
|
||||
util.delete_temp_file(temp_file.name)
|
||||
|
||||
|
||||
def photo_mask(pk_phhd, name, id_card_num):
|
||||
session = MysqlSession()
|
||||
phrecs = session.query(ZxPhrec.cfjaddress).filter(and_(
|
||||
ZxPhrec.pk_phhd == pk_phhd,
|
||||
ZxPhrec.cRectype.in_(["3", "4"])
|
||||
)).all()
|
||||
session.close()
|
||||
for phrec in phrecs:
|
||||
img_url = ucloud.get_private_url(phrec.cfjaddress)
|
||||
if not img_url:
|
||||
continue
|
||||
# 是否有涂抹
|
||||
is_masked = False
|
||||
# 打开图片
|
||||
image = image_util.read(img_url)
|
||||
split_results = image_util.split(image)
|
||||
for split_result in split_results:
|
||||
angles = image_util.parse_rotation_angles(split_result["img"])
|
||||
angle = int(angles[0])
|
||||
rotated_img = image_util.rotate(split_result["img"], angle)
|
||||
rotated_img, offset_x, offset_y = image_util.expand_to_a4_size(rotated_img, True)
|
||||
split_result["x_offset"] -= offset_x
|
||||
split_result["y_offset"] -= offset_y
|
||||
results = get_mask_layout(rotated_img, name, id_card_num)
|
||||
if not results:
|
||||
angle = int(angles[1])
|
||||
rotated_img = image_util.rotate(split_result["img"], angle)
|
||||
results = get_mask_layout(rotated_img, name, id_card_num)
|
||||
if not results and "0" not in angles:
|
||||
angle = 0
|
||||
results = get_mask_layout(split_result["img"], name, id_card_num)
|
||||
|
||||
if results:
|
||||
is_masked = True
|
||||
|
||||
for result in results:
|
||||
height, width = split_result["img"].shape[:2]
|
||||
center = (width / 2, height / 2)
|
||||
result = image_util.invert_rotate_rectangle(result, center, angle)
|
||||
result = (
|
||||
result[0] + split_result["x_offset"],
|
||||
result[1] + split_result["y_offset"],
|
||||
result[2] + split_result["x_offset"],
|
||||
result[3] + split_result["y_offset"],
|
||||
)
|
||||
cv2.rectangle(image, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])),
|
||||
(255, 255, 255), -1, 0)
|
||||
|
||||
# 如果涂抹了要备份以及更新
|
||||
if is_masked:
|
||||
for i in range(COPY_TRY_TIMES):
|
||||
is_copy_success = ucloud.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress)
|
||||
if is_copy_success:
|
||||
break
|
||||
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
|
||||
cv2.imwrite(temp_file.name, image)
|
||||
try:
|
||||
for i in range(UPLOAD_TRY_TIMES):
|
||||
is_upload_success = ucloud.upload_file(phrec.cfjaddress, temp_file.name)
|
||||
if is_upload_success:
|
||||
break
|
||||
except Exception as e:
|
||||
logging.error("上传图片出错", exc_info=e)
|
||||
finally:
|
||||
util.delete_temp_file(temp_file.name)
|
||||
|
||||
|
||||
def main():
|
||||
while 1:
|
||||
session = MysqlSession()
|
||||
phhds = session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm, ZxPhhd.cSfzh).filter(
|
||||
ZxPhhd.paint_flag == "1"
|
||||
).limit(PHHD_BATCH_SIZE).all()
|
||||
# 将状态改为正在涂抹中
|
||||
pk_phhd_values = [phhd.pk_phhd for phhd in phhds]
|
||||
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(paint_flag="2"))
|
||||
session.execute(update_flag)
|
||||
session.commit()
|
||||
session.close()
|
||||
if phhds:
|
||||
for phhd in phhds:
|
||||
pk_phhd = phhd.pk_phhd
|
||||
logging.info(f"开始涂抹:{pk_phhd}")
|
||||
photo_mask(pk_phhd, phhd.cXm, phhd.cSfzh)
|
||||
|
||||
# 识别完成更新标识
|
||||
session = MysqlSession()
|
||||
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(paint_flag="8"))
|
||||
session.execute(update_flag)
|
||||
session.commit()
|
||||
session.close()
|
||||
else:
|
||||
# 没有查询到新案子,等待一段时间后再查
|
||||
logging.info(f"暂未查询到需要涂抹的案子,等待{SLEEP_MINUTES}分钟...")
|
||||
sleep(SLEEP_MINUTES * 60)
|
||||
@@ -5,5 +5,3 @@ pymysql==1.1.0
|
||||
sqlacodegen==2.3.0.post1
|
||||
sqlalchemy==1.4.52
|
||||
ufile==3.2.9
|
||||
opencv-python==4.6.0.66
|
||||
numpy==1.26.2
|
||||
@@ -1,202 +0,0 @@
|
||||
import logging
|
||||
import math
|
||||
import urllib.request
|
||||
|
||||
import cv2
|
||||
import numpy
|
||||
from paddleclas import PaddleClas
|
||||
|
||||
|
||||
def read(image_path):
|
||||
"""
|
||||
从网络或本地读取图片
|
||||
:param image_path: 网络或本地路径
|
||||
:return: NumPy数组形式的图片
|
||||
"""
|
||||
if image_path.startswith("http"):
|
||||
# 发送HTTP请求并获取图像数据
|
||||
resp = urllib.request.urlopen(image_path)
|
||||
# 将数据读取为字节流
|
||||
image_data = resp.read()
|
||||
# 将字节流转换为NumPy数组
|
||||
image_np = numpy.frombuffer(image_data, numpy.uint8)
|
||||
# 解码NumPy数组为OpenCV图像格式
|
||||
image = cv2.imdecode(image_np, cv2.IMREAD_COLOR)
|
||||
else:
|
||||
image = cv2.imread(image_path)
|
||||
return image
|
||||
|
||||
|
||||
def capture(image, rectangle):
|
||||
"""
|
||||
截取图片
|
||||
:param image: 图片NumPy数组
|
||||
:param rectangle: 要截取的矩形
|
||||
:return: 截取之后的图片NumPy
|
||||
"""
|
||||
x1, y1, x2, y2 = rectangle
|
||||
return image[int(y1):int(y2), int(x1):int(x2)]
|
||||
|
||||
|
||||
def split(image, ratio=1.414, overlap=0.05):
|
||||
"""
|
||||
分割图片,只分割过长的图片,暂不处理过宽的图片
|
||||
:param image:图片,可以是NumPy数组或文件路径
|
||||
:param ratio: 分割后的比例
|
||||
:param overlap: 图片之间的覆盖比例
|
||||
:return: 分割后的图片组(NumPy数组形式)
|
||||
"""
|
||||
split_result = []
|
||||
if isinstance(image, str):
|
||||
image = read(image)
|
||||
# 获取图片的宽度和高度
|
||||
height, width = image.shape[:2]
|
||||
# 计算宽高比
|
||||
img_ratio = height / width
|
||||
# 检查是否需要裁剪
|
||||
if img_ratio > ratio:
|
||||
split_ratio = ratio - overlap
|
||||
# 分割后的高度
|
||||
new_img_height = width * ratio
|
||||
for i in range(math.ceil(height / (width * split_ratio))):
|
||||
offset = round(width * split_ratio * i)
|
||||
# 参数形式为[y1:y2, x1:x2]
|
||||
cropped_img = capture(image, [0, offset, width, offset + new_img_height])
|
||||
split_result.append({"img": cropped_img, "x_offset": 0, "y_offset": offset})
|
||||
else:
|
||||
split_result.append({"img": image, "x_offset": 0, "y_offset": 0})
|
||||
return split_result
|
||||
|
||||
|
||||
def parse_rotation_angles(image):
|
||||
"""
|
||||
判断图片旋转角度,逆时针旋转该角度后为正。可能值["0", "90", "180", "270"]
|
||||
:param image: 图片NumPy数组或文件路径
|
||||
:return: 最有可能的两个角度
|
||||
"""
|
||||
angles = ['0', '90']
|
||||
model = PaddleClas(model_name="text_image_orientation")
|
||||
clas_result = model.predict(input_data=image)
|
||||
try:
|
||||
clas_result = next(clas_result)[0]
|
||||
if clas_result["scores"][0] < 0.5:
|
||||
return angles
|
||||
angles = clas_result["label_names"]
|
||||
except Exception as e:
|
||||
logging.error("获取图片旋转角度失败", exc_info=e)
|
||||
return angles
|
||||
|
||||
|
||||
def rotate(image, angle):
|
||||
"""
|
||||
旋转图片
|
||||
:param image: 图片NumPy数组
|
||||
:param angle: 逆时针旋转角度
|
||||
:return: 旋转后的图片NumPy数组
|
||||
"""
|
||||
if angle == 0:
|
||||
return image
|
||||
height, width = image.shape[:2]
|
||||
if angle == 180:
|
||||
new_width = width
|
||||
new_height = height
|
||||
else:
|
||||
new_width = height
|
||||
new_height = width
|
||||
# 绕图像的中心旋转
|
||||
# 参数:旋转中心 旋转度数 scale
|
||||
matrix = cv2.getRotationMatrix2D((width / 2, height / 2), angle, 1)
|
||||
# 旋转后平移
|
||||
matrix[0, 2] += (new_width - width) / 2
|
||||
matrix[1, 2] += (new_height - height) / 2
|
||||
# 参数:原始图像 旋转参数 元素图像宽高
|
||||
rotated = cv2.warpAffine(image, matrix, (new_width, new_height))
|
||||
return rotated
|
||||
|
||||
|
||||
def invert_rotate_point(point, center, angle):
|
||||
"""
|
||||
反向旋转图片上的点
|
||||
:param point: 点
|
||||
:param center: 旋转中心
|
||||
:param angle: 旋转角度
|
||||
:return: 旋转后的点坐标
|
||||
"""
|
||||
matrix = cv2.getRotationMatrix2D(center, angle, 1)
|
||||
if angle != 180:
|
||||
# 旋转后平移
|
||||
matrix[0, 2] += center[1] - center[0]
|
||||
matrix[1, 2] += center[0] - center[1]
|
||||
|
||||
reverse_matrix = cv2.invertAffineTransform(matrix)
|
||||
|
||||
point = numpy.array([[point[0]], [point[1]], [1]])
|
||||
return numpy.dot(reverse_matrix, point)
|
||||
|
||||
|
||||
def invert_rotate_rectangle(rectangle, center, angle):
|
||||
"""
|
||||
反向旋转图片上的矩形
|
||||
:param rectangle: 矩形
|
||||
:param center: 旋转中心
|
||||
:param angle: 旋转角度
|
||||
:return: 旋转后的矩形坐标
|
||||
"""
|
||||
if angle == 0:
|
||||
return list(rectangle)
|
||||
|
||||
x1, y1, x2, y2 = rectangle
|
||||
|
||||
# 计算矩形的四个顶点
|
||||
top_left = (x1, y1)
|
||||
bot_left = (x1, y2)
|
||||
top_right = (x2, y1)
|
||||
bot_right = (x2, y2)
|
||||
|
||||
# 旋转矩形的四个顶点
|
||||
rot_top_left = invert_rotate_point(top_left, center, angle).astype(int)
|
||||
rot_bot_left = invert_rotate_point(bot_left, center, angle).astype(int)
|
||||
rot_bot_right = invert_rotate_point(bot_right, center, angle).astype(int)
|
||||
rot_top_right = invert_rotate_point(top_right, center, angle).astype(int)
|
||||
|
||||
# 找出旋转后矩形的新左上角和右下角坐标
|
||||
new_top_left = (min(rot_top_left[0], rot_bot_left[0], rot_bot_right[0], rot_top_right[0]),
|
||||
min(rot_top_left[1], rot_bot_left[1], rot_bot_right[1], rot_top_right[1]))
|
||||
new_bot_right = (max(rot_top_left[0], rot_bot_left[0], rot_bot_right[0], rot_top_right[0]),
|
||||
max(rot_top_left[1], rot_bot_left[1], rot_bot_right[1], rot_top_right[1]))
|
||||
|
||||
return [new_top_left[0], new_top_left[1], new_bot_right[0], new_bot_right[1]]
|
||||
|
||||
|
||||
def expand_to_a4_size(image, center=False):
|
||||
"""
|
||||
将图片扩充到a4大小
|
||||
:param image: 图片NumPy数组
|
||||
:param center: 是否将原图置于中间
|
||||
:return: 扩充后的图片NumPy数组和偏移量
|
||||
"""
|
||||
h, w = image.shape[:2]
|
||||
offset_x, offset_y = 0, 0
|
||||
if h * 1.0 / w >= 1.42:
|
||||
exp_w = int(h / 1.414 - w)
|
||||
if center:
|
||||
offset_x = int(exp_w / 2)
|
||||
exp_img = numpy.zeros((h, offset_x, 3), dtype="uint8")
|
||||
exp_img.fill(255)
|
||||
image = numpy.hstack([exp_img, image, exp_img])
|
||||
else:
|
||||
exp_img = numpy.zeros((h, exp_w, 3), dtype="uint8")
|
||||
exp_img.fill(255)
|
||||
image = numpy.hstack([image, exp_img])
|
||||
elif h * 1.0 / w <= 1.40:
|
||||
exp_h = int(w * 1.414 - h)
|
||||
if center:
|
||||
offset_y = int(exp_h / 2)
|
||||
exp_img = numpy.zeros((offset_y, w, 3), dtype="uint8")
|
||||
exp_img.fill(255)
|
||||
image = numpy.vstack([exp_img, image, exp_img])
|
||||
else:
|
||||
exp_img = numpy.zeros((exp_h, w, 3), dtype="uint8")
|
||||
exp_img.fill(255)
|
||||
image = numpy.vstack([image, exp_img])
|
||||
return image, offset_x, offset_y
|
||||
72
util/util.py
72
util/util.py
@@ -1,78 +1,6 @@
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
# 获取yyyy-MM-dd HH:mm:ss格式的当前时间
|
||||
def get_default_datetime():
|
||||
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
|
||||
def get_ocr_layout(ocr, img_path):
|
||||
"""
|
||||
获取ocr识别的结果,转为合适的layout形式
|
||||
:param ocr: ocr模型
|
||||
:param img_path: 图片本地路径
|
||||
:return:
|
||||
"""
|
||||
|
||||
def _get_box(old_box):
|
||||
new_box = [
|
||||
min(old_box[0][0], old_box[3][0]), # x1
|
||||
min(old_box[0][1], old_box[1][1]), # y1
|
||||
max(old_box[1][0], old_box[2][0]), # x2
|
||||
max(old_box[2][1], old_box[3][1]), # y2
|
||||
]
|
||||
return new_box
|
||||
|
||||
def _normal_box(box_data):
|
||||
# Ensure the height and width of bbox are greater than zero
|
||||
if box_data[3] - box_data[1] < 0 or box_data[2] - box_data[0] < 0:
|
||||
return False
|
||||
return True
|
||||
|
||||
layout = []
|
||||
ocr_result = ocr.ocr(img_path, cls=False)
|
||||
ocr_result = ocr_result[0]
|
||||
if not ocr_result:
|
||||
return layout
|
||||
for segment in ocr_result:
|
||||
box = segment[0]
|
||||
box = _get_box(box)
|
||||
if not _normal_box(box):
|
||||
continue
|
||||
text = segment[1][0]
|
||||
layout.append((box, text))
|
||||
return layout
|
||||
|
||||
|
||||
def delete_temp_file(temp_files):
|
||||
"""
|
||||
删除临时文件,可以批量
|
||||
:param temp_files: 临时文件路径
|
||||
"""
|
||||
if not temp_files:
|
||||
return
|
||||
if isinstance(temp_files, str):
|
||||
temp_files = [temp_files]
|
||||
for file in temp_files:
|
||||
try:
|
||||
os.remove(file)
|
||||
logging.info(f"临时文件 {file} 已删除")
|
||||
except Exception as e:
|
||||
logging.warning(f"删除临时文件 {file} 时出错: {e}")
|
||||
|
||||
|
||||
def zoom_rectangle(rectangle, ratio):
|
||||
"""
|
||||
缩放矩形
|
||||
:param rectangle: 原矩形坐标
|
||||
:param ratio: 缩放比率
|
||||
:return: 缩放后的矩形坐标
|
||||
"""
|
||||
x1, y1, x2, y2 = rectangle
|
||||
x1 = round(x1 - x1 * ratio)
|
||||
y1 = round(y1 - y1 * ratio)
|
||||
x2 = round(x2 + x2 * ratio)
|
||||
y2 = round(y2 + y2 * ratio)
|
||||
return [x1, y1, x2, y2]
|
||||
|
||||
Reference in New Issue
Block a user