Files
fcb_photo_review/photo_mask/auto_photo_mask.py

294 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging.config
import os
import re
import shutil
import time
import uuid
from time import sleep
import cv2
from sqlalchemy import update, and_
from db import MysqlSession
from db.mysql import ZxPhrec, ZxPhhd
from log import HOSTNAME
from photo_mask import PHHD_BATCH_SIZE, SLEEP_MINUTES, NAME_KEYS, ID_CARD_NUM_KEYS, SIMILAR_CHAR
from photo_review import set_batch_id
from ucloud import BUCKET, ufile
from util import image_util, common_util, model_util
def find_boxes(content, layout, offset=0, length=None, improve=False, image_path=None, extra_content=None):
full_box = layout[0]
x_len = full_box[2] - full_box[0]
y_len = full_box[3] - full_box[1]
if x_len >= y_len:
# 横向排布
box_len = x_len
direction = "x"
else:
# 纵向排布
box_len = y_len
direction = "y"
text = layout[1]
text_len = len(text)
char_len = box_len / text_len
index = text.index(content)
if not length:
length = len(content) + 1
if direction == "x":
# 横向排布
box = [
full_box[0] + (index + offset) * char_len,
full_box[1],
full_box[0] + (index + offset + length) * char_len,
full_box[3],
]
else:
# 纵向排布
box = [
full_box[0],
full_box[1] + (index + offset) * char_len,
full_box[2],
full_box[1] + (index + offset + length) * char_len,
]
boxes = []
if improve:
# 再次识别,提高精度
image = cv2.imread(image_path)
img_name, img_ext = common_util.parse_save_path(image_path)
# 截图时偏大一点
capture_box = common_util.zoom_rectangle(box, 0.2)
captured_image = image_util.capture(image, capture_box)
captured_image_path = common_util.get_processed_img_path(f'{img_name}.capture.{img_ext}')
cv2.imwrite(captured_image_path, captured_image)
captured_a4_img_path, offset_x, offset_y = image_util.expand_to_a4_size(captured_image_path)
try:
layouts = common_util.ocr_result_to_layout(model_util.ocr(captured_a4_img_path))
except TypeError:
# 如果是类型错误,大概率是没识别到文字
layouts = []
except Exception as e:
# 如果出现其他错误,抛出
raise e
for layout in layouts:
if extra_content:
matches = re.findall(extra_content, layout[1])
else:
matches = [content]
for c in matches:
if c in layout[1]:
temp_box = find_boxes(c, layout)[0]
if temp_box:
boxes.append([
temp_box[0] + capture_box[0] - offset_x,
temp_box[1] + capture_box[1] - offset_y,
temp_box[2] + capture_box[0] - offset_x,
temp_box[3] + capture_box[1] - offset_y,
])
break
if not boxes:
boxes.append(box)
return boxes
def get_mask_layout(img_path, name, id_card_num):
result = []
try:
try:
layouts = common_util.ocr_result_to_layout(model_util.ocr(img_path))
except TypeError:
# 如果是类型错误,大概率是没识别到文字
layouts = []
except Exception as e:
# 如果出现其他错误,抛出
raise e
if not layouts:
# 无识别结果
return result
else:
re_list = []
for char in name:
char_re = f"{char}"
if char in SIMILAR_CHAR:
char_re += "|" + "|".join(SIMILAR_CHAR[char])
re_list.append(char_re)
name_len = len(name)
name_offset = int(name_len / 2)
if name_len > 2:
r = f"[{']['.join(re_list[:-1])}].?|.?[{']['.join(re_list[1:])}]"
else:
r = f"[{']['.join(re_list)}]"
if (name_len & 1) == 1:
r += f"|[{']['.join(re_list[:name_offset])}].?[{']['.join(re_list[-name_offset:])}]"
r += f"|名[:|](?=.*[{'|'.join(re_list)}]).{{{name_len}}}"
for layout in layouts:
find_name_by_key = True
find_id_card_num_by_key = True
matches = re.findall(r, layout[1])
for match in matches:
result += find_boxes(match, layout, improve=True, image_path=img_path, extra_content=r)
find_name_by_key = False
break
if id_card_num in layout[1]:
result += find_boxes(id_card_num, layout, improve=True, image_path=img_path)
find_id_card_num_by_key = False
def _find_boxes_by_keys(keys):
boxes = []
for key in keys:
match_list = re.findall(key["key"], layout[1])
for m in match_list:
if m in layout[1]:
boxes += find_boxes(m, layout, offset=key.get("offset", len(m)), length=key["length"])
if match_list:
break
return boxes
if find_name_by_key:
result += _find_boxes_by_keys(NAME_KEYS)
if find_id_card_num_by_key:
result += _find_boxes_by_keys(ID_CARD_NUM_KEYS)
return result
except Exception as e:
logging.error("涂抹时出错!", exc_info=e)
return result
def handle_image_for_mask(split_result):
expand_img, offset_x, offset_y = image_util.expand_to_a4_size(split_result["img"])
split_result["x_offset"] -= offset_x
split_result["y_offset"] -= offset_y
return expand_img, split_result["x_offset"], split_result["y_offset"]
def mask_photo(img_path, name, id_card_num, color=(255, 255, 255)):
def _mask(ip, n, icn, c):
i = cv2.imread(ip)
img_name, img_ext = common_util.parse_save_path(ip)
do_mask = False
split_results = image_util.split(ip)
for split_result in split_results:
to_mask_img, x_offset, y_offset = handle_image_for_mask(split_result)
results = get_mask_layout(to_mask_img, n, icn)
if results:
do_mask = True
for result in results:
result = (
result[0] + x_offset,
result[1] + y_offset,
result[2] + x_offset,
result[3] + y_offset,
)
cv2.rectangle(i, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])), c, -1, 0)
masked_path = common_util.get_processed_img_path(f'{img_name}.mask.{img_ext}')
cv2.imwrite(masked_path, i)
return do_mask, masked_path
original_image = img_path
is_masked, img_path = _mask(img_path, name, id_card_num, color)
if not is_masked:
# 如果没有涂抹,可能是图片方向不对
angles = model_util.clas_orientation(img_path)
angle = angles[0]
if angle != "0":
img_path = image_util.rotate(img_path, int(angle))
is_masked, img_path = _mask(img_path, name, id_card_num, color)
if not is_masked:
# 如果旋转后也没有涂抹,恢复原来的方向
img_path = original_image
else:
# 如果旋转有效果,打一个日志
logging.info(f"图片旋转了{angle}°")
return is_masked, img_path
def photo_mask(pk_phhd, name, id_card_num):
session = MysqlSession()
phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.cfjaddress).filter(and_(
ZxPhrec.pk_phhd == pk_phhd,
ZxPhrec.cRectype.in_(["3", "4"])
)).all()
session.close()
# 同一批图的标识
set_batch_id(uuid.uuid4().hex)
processed_img_dir = common_util.get_processed_img_path('')
os.makedirs(processed_img_dir, exist_ok=True)
for phrec in phrecs:
img_url = ufile.get_private_url(phrec.cfjaddress)
if not img_url:
continue
original_img_path = common_util.save_to_local(img_url)
img_path = common_util.get_processed_img_path(phrec.cfjaddress)
shutil.copy2(original_img_path, img_path)
is_masked, image = mask_photo(img_path, name, id_card_num)
# 如果涂抹了要备份以及更新
if is_masked:
try:
ufile.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress)
ufile.upload_file(phrec.cfjaddress, image)
session = MysqlSession()
update_flag = (update(ZxPhrec).where(ZxPhrec.pk_phrec == phrec.pk_phrec).values(
paint_user=HOSTNAME,
paint_date=common_util.get_default_datetime()))
session.execute(update_flag)
session.commit()
session.close()
except Exception as e:
logging.error("上传图片出错", exc_info=e)
# 删除多余图片
if os.path.exists(processed_img_dir) and os.path.isdir(processed_img_dir):
shutil.rmtree(processed_img_dir)
def main():
while 1:
session = MysqlSession()
phhds = (session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm, ZxPhhd.cSfzh)
.join(ZxPhrec, ZxPhhd.pk_phhd == ZxPhrec.pk_phhd, isouter=True)
.filter(ZxPhhd.paint_flag == "1")
.filter(ZxPhrec.pk_phrec.isnot(None))
.order_by(ZxPhhd.priority_num.desc())
.distinct().limit(PHHD_BATCH_SIZE).all())
# 将状态改为正在涂抹中
pk_phhd_values = [phhd.pk_phhd for phhd in phhds]
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(paint_flag="2"))
session.execute(update_flag)
session.commit()
session.close()
if phhds:
for phhd in phhds:
pk_phhd = phhd.pk_phhd
logging.info(f"开始涂抹:{pk_phhd}")
start_time = time.time()
photo_mask(pk_phhd, phhd.cXm, phhd.cSfzh)
# 涂抹完成更新标识
session = MysqlSession()
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(
paint_flag="8",
paint_user=HOSTNAME,
paint_date=common_util.get_default_datetime(),
fZcfwfy=time.time() - start_time))
session.execute(update_flag)
session.commit()
session.close()
else:
# 没有查询到新案子,等待一段时间后再查
logging.info(f"暂未查询到需要涂抹的案子,等待{SLEEP_MINUTES}分钟...")
sleep(SLEEP_MINUTES * 60)