225 lines
8.0 KiB
Python
225 lines
8.0 KiB
Python
import logging.config
|
|
import tempfile
|
|
from time import sleep
|
|
|
|
import cv2
|
|
from sqlalchemy import update, and_
|
|
|
|
from db import MysqlSession
|
|
from db.mysql import ZxPhrec, ZxPhhd
|
|
from photo_mask import OCR, PHHD_BATCH_SIZE, SLEEP_MINUTES, COPY_TRY_TIMES, UPLOAD_TRY_TIMES, NAME_KEYS, \
|
|
ID_CARD_NUM_KEYS
|
|
from ucloud import BUCKET, ucloud
|
|
from util import image_util, util
|
|
|
|
|
|
def find_boxes(content, layout, offset=0, length=None, improve=False, image_path=None):
|
|
full_box = layout[0]
|
|
x_len = full_box[2] - full_box[0]
|
|
y_len = full_box[3] - full_box[1]
|
|
if x_len >= y_len:
|
|
# 横向排布
|
|
box_len = x_len
|
|
direction = "x"
|
|
else:
|
|
# 纵向排布
|
|
box_len = y_len
|
|
direction = "y"
|
|
text = layout[1]
|
|
text_len = len(text)
|
|
char_len = box_len / text_len
|
|
index = text.index(content)
|
|
|
|
if not length:
|
|
length = len(content) + 1
|
|
if direction == "x":
|
|
# 横向排布
|
|
box = [
|
|
full_box[0] + (index + offset) * char_len,
|
|
full_box[1],
|
|
full_box[0] + (index + offset + length) * char_len,
|
|
full_box[3],
|
|
]
|
|
else:
|
|
# 纵向排布
|
|
box = [
|
|
full_box[0],
|
|
full_box[1] + (index + offset) * char_len,
|
|
full_box[2],
|
|
full_box[1] + (index + offset + length) * char_len,
|
|
]
|
|
|
|
boxes = []
|
|
if improve:
|
|
# 再次识别,提高精度
|
|
image = cv2.imread(image_path)
|
|
# 截图时偏大一点
|
|
capture_box = util.zoom_rectangle(box, 0.2)
|
|
captured_image = image_util.capture(image, capture_box)
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file:
|
|
captured_image, offset_x, offset_y = image_util.expand_to_a4_size(captured_image, True)
|
|
cv2.imwrite(temp_file.name, captured_image)
|
|
try:
|
|
layouts = OCR.parse({"doc": temp_file.name})["layout"]
|
|
except TypeError:
|
|
# 如果是类型错误,大概率是没识别到文字
|
|
layouts = []
|
|
except Exception as e:
|
|
# 如果出现其他错误,抛出
|
|
raise e
|
|
for layout in layouts:
|
|
if content in layout[1]:
|
|
temp_box = find_boxes(content, layout)[0]
|
|
if temp_box:
|
|
boxes.append([
|
|
temp_box[0] + capture_box[0] - offset_x,
|
|
temp_box[1] + capture_box[1] - offset_y,
|
|
temp_box[2] + capture_box[0] - offset_x,
|
|
temp_box[3] + capture_box[1] - offset_y,
|
|
])
|
|
util.delete_temp_file(temp_file.name)
|
|
|
|
if not boxes:
|
|
boxes.append(box)
|
|
return boxes
|
|
|
|
|
|
def get_mask_layout(image, name, id_card_num):
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
|
|
cv2.imwrite(temp_file.name, image)
|
|
|
|
result = []
|
|
try:
|
|
try:
|
|
# layouts = util.get_ocr_layout(OCR, temp_file.name)
|
|
layouts = OCR.parse({"doc": temp_file.name})["layout"]
|
|
except TypeError:
|
|
# 如果是类型错误,大概率是没识别到文字
|
|
layouts = []
|
|
except Exception as e:
|
|
# 如果出现其他错误,抛出
|
|
raise e
|
|
|
|
if not layouts:
|
|
# 无识别结果
|
|
return result
|
|
else:
|
|
# 涂抹
|
|
for layout in layouts:
|
|
find_name_by_key = True
|
|
find_id_card_num_by_key = True
|
|
if name in layout[1]:
|
|
result += find_boxes(name, layout, improve=True, image_path=temp_file.name)
|
|
find_name_by_key = False
|
|
if id_card_num in layout[1]:
|
|
result += find_boxes(id_card_num, layout, improve=True, image_path=temp_file.name)
|
|
find_id_card_num_by_key = False
|
|
|
|
keys = []
|
|
if find_name_by_key:
|
|
keys += NAME_KEYS
|
|
if find_id_card_num_by_key:
|
|
keys += ID_CARD_NUM_KEYS
|
|
for key in keys:
|
|
if key["key"] in layout[1]:
|
|
result += find_boxes(key["key"], layout, offset=len(key["key"]), length=key["length"])
|
|
return result
|
|
except Exception as e:
|
|
logging.error("涂抹时出错!", exc_info=e)
|
|
return result
|
|
finally:
|
|
util.delete_temp_file(temp_file.name)
|
|
|
|
|
|
def handle_image_for_mask(split_result):
|
|
expand_img, offset_x, offset_y = image_util.expand_to_a4_size(split_result["img"], True)
|
|
split_result["x_offset"] -= offset_x
|
|
split_result["y_offset"] -= offset_y
|
|
gray_image = cv2.cvtColor(expand_img, cv2.COLOR_BGR2GRAY)
|
|
return gray_image, split_result["x_offset"], split_result["y_offset"]
|
|
|
|
|
|
def photo_mask(pk_phhd, name, id_card_num):
|
|
session = MysqlSession()
|
|
phrecs = session.query(ZxPhrec.cfjaddress).filter(and_(
|
|
ZxPhrec.pk_phhd == pk_phhd,
|
|
ZxPhrec.cRectype.in_(["3", "4"])
|
|
)).all()
|
|
session.close()
|
|
for phrec in phrecs:
|
|
img_url = ucloud.get_private_url(phrec.cfjaddress)
|
|
if not img_url:
|
|
continue
|
|
# 是否有涂抹
|
|
is_masked = False
|
|
# 打开图片
|
|
image = image_util.read(img_url)
|
|
split_results = image_util.split(image)
|
|
for split_result in split_results:
|
|
to_mask_img, x_offset, y_offset = handle_image_for_mask(split_result)
|
|
results = get_mask_layout(to_mask_img, name, id_card_num)
|
|
|
|
if results:
|
|
is_masked = True
|
|
|
|
for result in results:
|
|
result = (
|
|
result[0] + x_offset,
|
|
result[1] + y_offset,
|
|
result[2] + x_offset,
|
|
result[3] + y_offset,
|
|
)
|
|
cv2.rectangle(image, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])),
|
|
(255, 255, 255), -1, 0)
|
|
|
|
# 如果涂抹了要备份以及更新
|
|
if is_masked:
|
|
for i in range(COPY_TRY_TIMES):
|
|
is_copy_success = ucloud.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress)
|
|
if is_copy_success:
|
|
break
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
|
|
cv2.imwrite(temp_file.name, image)
|
|
try:
|
|
for i in range(UPLOAD_TRY_TIMES):
|
|
is_upload_success = ucloud.upload_file(phrec.cfjaddress, temp_file.name)
|
|
if is_upload_success:
|
|
break
|
|
except Exception as e:
|
|
logging.error("上传图片出错", exc_info=e)
|
|
finally:
|
|
util.delete_temp_file(temp_file.name)
|
|
|
|
|
|
def main():
|
|
while 1:
|
|
session = MysqlSession()
|
|
phhds = session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm, ZxPhhd.cSfzh).filter(
|
|
ZxPhhd.paint_flag == "1"
|
|
).limit(PHHD_BATCH_SIZE).all()
|
|
# 将状态改为正在涂抹中
|
|
pk_phhd_values = [phhd.pk_phhd for phhd in phhds]
|
|
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(paint_flag="2"))
|
|
session.execute(update_flag)
|
|
session.commit()
|
|
session.close()
|
|
if phhds:
|
|
for phhd in phhds:
|
|
pk_phhd = phhd.pk_phhd
|
|
logging.info(f"开始涂抹:{pk_phhd}")
|
|
photo_mask(pk_phhd, phhd.cXm, phhd.cSfzh)
|
|
|
|
# 涂抹完成更新标识
|
|
session = MysqlSession()
|
|
update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(
|
|
paint_flag="8",
|
|
paint_date=util.get_default_datetime()))
|
|
session.execute(update_flag)
|
|
session.commit()
|
|
session.close()
|
|
else:
|
|
# 没有查询到新案子,等待一段时间后再查
|
|
logging.info(f"暂未查询到需要涂抹的案子,等待{SLEEP_MINUTES}分钟...")
|
|
sleep(SLEEP_MINUTES * 60)
|