294 lines
11 KiB
Python
294 lines
11 KiB
Python
import logging.config
|
||
import os
|
||
import re
|
||
import shutil
|
||
import time
|
||
import uuid
|
||
from time import sleep
|
||
|
||
import cv2
|
||
from sqlalchemy import update, and_
|
||
|
||
from db import MysqlSession
|
||
from db.mysql import ZxPhrec, ZxPhhd
|
||
from log import HOSTNAME
|
||
from photo_mask import PHHD_BATCH_SIZE, SLEEP_MINUTES, NAME_KEYS, ID_CARD_NUM_KEYS, SIMILAR_CHAR
|
||
from photo_review import set_batch_id
|
||
from ucloud import BUCKET, ufile
|
||
from util import image_util, common_util, model_util
|
||
|
||
|
||
def find_boxes(content, layout, offset=0, length=None, improve=False, image_path=None, extra_content=None):
|
||
full_box = layout[0]
|
||
x_len = full_box[2] - full_box[0]
|
||
y_len = full_box[3] - full_box[1]
|
||
if x_len >= y_len:
|
||
# 横向排布
|
||
box_len = x_len
|
||
direction = "x"
|
||
else:
|
||
# 纵向排布
|
||
box_len = y_len
|
||
direction = "y"
|
||
text = layout[1]
|
||
text_len = len(text)
|
||
char_len = box_len / text_len
|
||
index = text.index(content)
|
||
|
||
if not length:
|
||
length = len(content) + 1
|
||
if direction == "x":
|
||
# 横向排布
|
||
box = [
|
||
full_box[0] + (index + offset) * char_len,
|
||
full_box[1],
|
||
full_box[0] + (index + offset + length) * char_len,
|
||
full_box[3],
|
||
]
|
||
else:
|
||
# 纵向排布
|
||
box = [
|
||
full_box[0],
|
||
full_box[1] + (index + offset) * char_len,
|
||
full_box[2],
|
||
full_box[1] + (index + offset + length) * char_len,
|
||
]
|
||
|
||
boxes = []
|
||
if improve:
|
||
# 再次识别,提高精度
|
||
image = cv2.imread(image_path)
|
||
img_name, img_ext = common_util.parse_save_path(image_path)
|
||
# 截图时偏大一点
|
||
capture_box = common_util.zoom_rectangle(box, 0.2)
|
||
captured_image = image_util.capture(image, capture_box)
|
||
captured_image_path = common_util.get_processed_img_path(f'{img_name}.capture.{img_ext}')
|
||
cv2.imwrite(captured_image_path, captured_image)
|
||
captured_a4_img_path, offset_x, offset_y = image_util.expand_to_a4_size(captured_image_path)
|
||
try:
|
||
layouts = common_util.ocr_result_to_layout(model_util.ocr(captured_a4_img_path))
|
||
except TypeError:
|
||
# 如果是类型错误,大概率是没识别到文字
|
||
layouts = []
|
||
except Exception as e:
|
||
# 如果出现其他错误,抛出
|
||
raise e
|
||
|
||
for layout in layouts:
|
||
if extra_content:
|
||
matches = re.findall(extra_content, layout[1])
|
||
else:
|
||
matches = [content]
|
||
for c in matches:
|
||
if c in layout[1]:
|
||
temp_box = find_boxes(c, layout)[0]
|
||
if temp_box:
|
||
boxes.append([
|
||
temp_box[0] + capture_box[0] - offset_x,
|
||
temp_box[1] + capture_box[1] - offset_y,
|
||
temp_box[2] + capture_box[0] - offset_x,
|
||
temp_box[3] + capture_box[1] - offset_y,
|
||
])
|
||
break
|
||
|
||
if not boxes:
|
||
boxes.append(box)
|
||
return boxes
|
||
|
||
|
||
def get_mask_layout(img_path, name, id_card_num):
|
||
result = []
|
||
try:
|
||
try:
|
||
layouts = common_util.ocr_result_to_layout(model_util.ocr(img_path))
|
||
except TypeError:
|
||
# 如果是类型错误,大概率是没识别到文字
|
||
layouts = []
|
||
except Exception as e:
|
||
# 如果出现其他错误,抛出
|
||
raise e
|
||
|
||
if not layouts:
|
||
# 无识别结果
|
||
return result
|
||
else:
|
||
re_list = []
|
||
for char in name:
|
||
char_re = f"{char}"
|
||
if char in SIMILAR_CHAR:
|
||
char_re += "|" + "|".join(SIMILAR_CHAR[char])
|
||
re_list.append(char_re)
|
||
|
||
name_len = len(name)
|
||
name_offset = int(name_len / 2)
|
||
if name_len > 2:
|
||
r = f"[{']['.join(re_list[:-1])}].?|.?[{']['.join(re_list[1:])}]"
|
||
else:
|
||
r = f"[{']['.join(re_list)}]"
|
||
if (name_len & 1) == 1:
|
||
r += f"|[{']['.join(re_list[:name_offset])}].?[{']['.join(re_list[-name_offset:])}]"
|
||
r += f"|名[:|:](?=.*[{'|'.join(re_list)}]).{{{name_len}}}"
|
||
|
||
for layout in layouts:
|
||
find_name_by_key = True
|
||
find_id_card_num_by_key = True
|
||
matches = re.findall(r, layout[1])
|
||
for match in matches:
|
||
result += find_boxes(match, layout, improve=True, image_path=img_path, extra_content=r)
|
||
find_name_by_key = False
|
||
break
|
||
|
||
if id_card_num in layout[1]:
|
||
result += find_boxes(id_card_num, layout, improve=True, image_path=img_path)
|
||
find_id_card_num_by_key = False
|
||
|
||
def _find_boxes_by_keys(keys):
|
||
boxes = []
|
||
for key in keys:
|
||
match_list = re.findall(key["key"], layout[1])
|
||
for m in match_list:
|
||
if m in layout[1]:
|
||
boxes += find_boxes(m, layout, offset=key.get("offset", len(m)), length=key["length"])
|
||
if match_list:
|
||
break
|
||
return boxes
|
||
|
||
if find_name_by_key:
|
||
result += _find_boxes_by_keys(NAME_KEYS)
|
||
if find_id_card_num_by_key:
|
||
result += _find_boxes_by_keys(ID_CARD_NUM_KEYS)
|
||
|
||
return result
|
||
except Exception as e:
|
||
logging.error("涂抹时出错!", exc_info=e)
|
||
return result
|
||
|
||
|
||
def handle_image_for_mask(split_result):
|
||
expand_img, offset_x, offset_y = image_util.expand_to_a4_size(split_result["img"])
|
||
split_result["x_offset"] -= offset_x
|
||
split_result["y_offset"] -= offset_y
|
||
return expand_img, split_result["x_offset"], split_result["y_offset"]
|
||
|
||
|
||
def mask_photo(img_path, name, id_card_num, color=(255, 255, 255)):
|
||
def _mask(ip, n, icn, c):
|
||
i = cv2.imread(ip)
|
||
img_name, img_ext = common_util.parse_save_path(ip)
|
||
do_mask = False
|
||
split_results = image_util.split(ip)
|
||
for split_result in split_results:
|
||
to_mask_img, x_offset, y_offset = handle_image_for_mask(split_result)
|
||
results = get_mask_layout(to_mask_img, n, icn)
|
||
|
||
if results:
|
||
do_mask = True
|
||
|
||
for result in results:
|
||
result = (
|
||
result[0] + x_offset,
|
||
result[1] + y_offset,
|
||
result[2] + x_offset,
|
||
result[3] + y_offset,
|
||
)
|
||
cv2.rectangle(i, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])), c, -1, 0)
|
||
masked_path = common_util.get_processed_img_path(f'{img_name}.mask.{img_ext}')
|
||
cv2.imwrite(masked_path, i)
|
||
return do_mask, masked_path
|
||
|
||
original_image = img_path
|
||
is_masked, img_path = _mask(img_path, name, id_card_num, color)
|
||
if not is_masked:
|
||
# 如果没有涂抹,可能是图片方向不对
|
||
angles = model_util.clas_orientation(img_path)
|
||
angle = angles[0]
|
||
if angle != "0":
|
||
img_path = image_util.rotate(img_path, int(angle))
|
||
is_masked, img_path = _mask(img_path, name, id_card_num, color)
|
||
if not is_masked:
|
||
# 如果旋转后也没有涂抹,恢复原来的方向
|
||
img_path = original_image
|
||
else:
|
||
# 如果旋转有效果,打一个日志
|
||
logging.info(f"图片旋转了{angle}°")
|
||
|
||
return is_masked, img_path
|
||
|
||
|
||
def photo_mask(pk_phhd, name, id_card_num):
|
||
session = MysqlSession()
|
||
phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.cfjaddress).filter(and_(
|
||
ZxPhrec.pk_phhd == pk_phhd,
|
||
ZxPhrec.cRectype.in_(["3", "4"])
|
||
)).all()
|
||
session.close()
|
||
# 同一批图的标识
|
||
set_batch_id(uuid.uuid4().hex)
|
||
processed_img_dir = common_util.get_processed_img_path('')
|
||
os.makedirs(processed_img_dir, exist_ok=True)
|
||
for phrec in phrecs:
|
||
img_url = ufile.get_private_url(phrec.cfjaddress)
|
||
if not img_url:
|
||
continue
|
||
original_img_path = common_util.save_to_local(img_url)
|
||
img_path = common_util.get_processed_img_path(phrec.cfjaddress)
|
||
shutil.copy2(original_img_path, img_path)
|
||
is_masked, image = mask_photo(img_path, name, id_card_num)
|
||
|
||
# 如果涂抹了要备份以及更新
|
||
if is_masked:
|
||
try:
|
||
ufile.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress)
|
||
ufile.upload_file(phrec.cfjaddress, image)
|
||
session = MysqlSession()
|
||
update_flag = (update(ZxPhrec).where(ZxPhrec.pk_phrec == phrec.pk_phrec).values(
|
||
paint_user=HOSTNAME,
|
||
paint_date=common_util.get_default_datetime()))
|
||
session.execute(update_flag)
|
||
session.commit()
|
||
session.close()
|
||
except Exception as e:
|
||
logging.error("上传图片出错", exc_info=e)
|
||
|
||
# 删除多余图片
|
||
if os.path.exists(processed_img_dir) and os.path.isdir(processed_img_dir):
|
||
shutil.rmtree(processed_img_dir)
|
||
|
||
|
||
def main():
|
||
while 1:
|
||
session = MysqlSession()
|
||
phhds = (session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm, ZxPhhd.cSfzh)
|
||
.join(ZxPhrec, ZxPhhd.pk_phhd == ZxPhrec.pk_phhd, isouter=True)
|
||
.filter(ZxPhhd.paint_flag == "1")
|
||
.filter(ZxPhrec.pk_phrec.isnot(None))
|
||
.order_by(ZxPhhd.priority_num.desc())
|
||
.distinct().limit(PHHD_BATCH_SIZE).all())
|
||
# 将状态改为正在涂抹中
|
||
pk_phhd_values = [phhd.pk_phhd for phhd in phhds]
|
||
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(paint_flag="2"))
|
||
session.execute(update_flag)
|
||
session.commit()
|
||
session.close()
|
||
if phhds:
|
||
for phhd in phhds:
|
||
pk_phhd = phhd.pk_phhd
|
||
logging.info(f"开始涂抹:{pk_phhd}")
|
||
start_time = time.time()
|
||
photo_mask(pk_phhd, phhd.cXm, phhd.cSfzh)
|
||
|
||
# 涂抹完成更新标识
|
||
session = MysqlSession()
|
||
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(
|
||
paint_flag="8",
|
||
paint_user=HOSTNAME,
|
||
paint_date=common_util.get_default_datetime(),
|
||
fZcfwfy=time.time() - start_time))
|
||
session.execute(update_flag)
|
||
session.commit()
|
||
session.close()
|
||
else:
|
||
# 没有查询到新案子,等待一段时间后再查
|
||
logging.info(f"暂未查询到需要涂抹的案子,等待{SLEEP_MINUTES}分钟...")
|
||
sleep(SLEEP_MINUTES * 60)
|