|
|
|
@@ -1,6 +1,5 @@
|
|
|
|
import logging.config
|
|
|
|
import logging.config
|
|
|
|
import re
|
|
|
|
import re
|
|
|
|
import tempfile
|
|
|
|
|
|
|
|
import time
|
|
|
|
import time
|
|
|
|
from time import sleep
|
|
|
|
from time import sleep
|
|
|
|
|
|
|
|
|
|
|
|
@@ -10,9 +9,9 @@ from sqlalchemy import update, and_
|
|
|
|
from db import MysqlSession
|
|
|
|
from db import MysqlSession
|
|
|
|
from db.mysql import ZxPhrec, ZxPhhd
|
|
|
|
from db.mysql import ZxPhrec, ZxPhhd
|
|
|
|
from log import HOSTNAME
|
|
|
|
from log import HOSTNAME
|
|
|
|
from photo_mask import OCR, PHHD_BATCH_SIZE, SLEEP_MINUTES, NAME_KEYS, ID_CARD_NUM_KEYS, SIMILAR_CHAR
|
|
|
|
from photo_mask import PHHD_BATCH_SIZE, SLEEP_MINUTES, NAME_KEYS, ID_CARD_NUM_KEYS, SIMILAR_CHAR
|
|
|
|
from ucloud import BUCKET, ufile
|
|
|
|
from ucloud import BUCKET, ufile
|
|
|
|
from util import image_util, common_util
|
|
|
|
from util import image_util, common_util, model_util
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_boxes(content, layout, offset=0, length=None, improve=False, image_path=None, extra_content=None):
|
|
|
|
def find_boxes(content, layout, offset=0, length=None, improve=False, image_path=None, extra_content=None):
|
|
|
|
@@ -55,14 +54,15 @@ def find_boxes(content, layout, offset=0, length=None, improve=False, image_path
|
|
|
|
if improve:
|
|
|
|
if improve:
|
|
|
|
# 再次识别,提高精度
|
|
|
|
# 再次识别,提高精度
|
|
|
|
image = cv2.imread(image_path)
|
|
|
|
image = cv2.imread(image_path)
|
|
|
|
|
|
|
|
img_name, img_ext = image_util.parse_save_path(image_path)
|
|
|
|
# 截图时偏大一点
|
|
|
|
# 截图时偏大一点
|
|
|
|
capture_box = common_util.zoom_rectangle(box, 0.2)
|
|
|
|
capture_box = common_util.zoom_rectangle(box, 0.2)
|
|
|
|
captured_image = image_util.capture(image, capture_box)
|
|
|
|
captured_image = image_util.capture(image, capture_box)
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file:
|
|
|
|
captured_image_path = image_util.get_save_path(f'{img_name}.capture.{img_ext}')
|
|
|
|
captured_image, offset_x, offset_y = image_util.expand_to_a4_size(captured_image)
|
|
|
|
cv2.imwrite(captured_image_path, captured_image)
|
|
|
|
cv2.imwrite(temp_file.name, captured_image)
|
|
|
|
captured_a4_img_path, offset_x, offset_y = image_util.expand_to_a4_size(captured_image_path)
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
layouts = common_util.get_ocr_layout(OCR, temp_file.name)
|
|
|
|
layouts = common_util.ocr_result_to_layout(model_util.ocr(captured_a4_img_path))
|
|
|
|
except TypeError:
|
|
|
|
except TypeError:
|
|
|
|
# 如果是类型错误,大概率是没识别到文字
|
|
|
|
# 如果是类型错误,大概率是没识别到文字
|
|
|
|
layouts = []
|
|
|
|
layouts = []
|
|
|
|
@@ -86,22 +86,17 @@ def find_boxes(content, layout, offset=0, length=None, improve=False, image_path
|
|
|
|
temp_box[3] + capture_box[1] - offset_y,
|
|
|
|
temp_box[3] + capture_box[1] - offset_y,
|
|
|
|
])
|
|
|
|
])
|
|
|
|
break
|
|
|
|
break
|
|
|
|
common_util.delete_temp_file(temp_file.name)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not boxes:
|
|
|
|
if not boxes:
|
|
|
|
boxes.append(box)
|
|
|
|
boxes.append(box)
|
|
|
|
return boxes
|
|
|
|
return boxes
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_mask_layout(image, name, id_card_num):
|
|
|
|
def get_mask_layout(img_path, name, id_card_num):
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
|
|
|
|
|
|
|
|
cv2.imwrite(temp_file.name, image)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
result = []
|
|
|
|
result = []
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
layouts = common_util.get_ocr_layout(OCR, temp_file.name)
|
|
|
|
layouts = common_util.ocr_result_to_layout(model_util.ocr(img_path))
|
|
|
|
# layouts = OCR.parse({"doc": temp_file.name})["layout"]
|
|
|
|
|
|
|
|
except TypeError:
|
|
|
|
except TypeError:
|
|
|
|
# 如果是类型错误,大概率是没识别到文字
|
|
|
|
# 如果是类型错误,大概率是没识别到文字
|
|
|
|
layouts = []
|
|
|
|
layouts = []
|
|
|
|
@@ -135,12 +130,12 @@ def get_mask_layout(image, name, id_card_num):
|
|
|
|
find_id_card_num_by_key = True
|
|
|
|
find_id_card_num_by_key = True
|
|
|
|
matches = re.findall(r, layout[1])
|
|
|
|
matches = re.findall(r, layout[1])
|
|
|
|
for match in matches:
|
|
|
|
for match in matches:
|
|
|
|
result += find_boxes(match, layout, improve=True, image_path=temp_file.name, extra_content=r)
|
|
|
|
result += find_boxes(match, layout, improve=True, image_path=img_path, extra_content=r)
|
|
|
|
find_name_by_key = False
|
|
|
|
find_name_by_key = False
|
|
|
|
break
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
if id_card_num in layout[1]:
|
|
|
|
if id_card_num in layout[1]:
|
|
|
|
result += find_boxes(id_card_num, layout, improve=True, image_path=temp_file.name)
|
|
|
|
result += find_boxes(id_card_num, layout, improve=True, image_path=img_path)
|
|
|
|
find_id_card_num_by_key = False
|
|
|
|
find_id_card_num_by_key = False
|
|
|
|
|
|
|
|
|
|
|
|
def _find_boxes_by_keys(keys):
|
|
|
|
def _find_boxes_by_keys(keys):
|
|
|
|
@@ -163,8 +158,6 @@ def get_mask_layout(image, name, id_card_num):
|
|
|
|
except Exception as e:
|
|
|
|
except Exception as e:
|
|
|
|
logging.error("涂抹时出错!", exc_info=e)
|
|
|
|
logging.error("涂抹时出错!", exc_info=e)
|
|
|
|
return result
|
|
|
|
return result
|
|
|
|
finally:
|
|
|
|
|
|
|
|
common_util.delete_temp_file(temp_file.name)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_image_for_mask(split_result):
|
|
|
|
def handle_image_for_mask(split_result):
|
|
|
|
@@ -174,10 +167,12 @@ def handle_image_for_mask(split_result):
|
|
|
|
return expand_img, split_result["x_offset"], split_result["y_offset"]
|
|
|
|
return expand_img, split_result["x_offset"], split_result["y_offset"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def mask_photo(img_url, name, id_card_num, color=(255, 255, 255)):
|
|
|
|
def mask_photo(img_path, name, id_card_num, color=(255, 255, 255)):
|
|
|
|
def _mask(i, n, icn, c):
|
|
|
|
def _mask(ip, n, icn, c):
|
|
|
|
|
|
|
|
i = cv2.imread(ip)
|
|
|
|
|
|
|
|
img_name, img_ext = image_util.parse_save_path(ip)
|
|
|
|
do_mask = False
|
|
|
|
do_mask = False
|
|
|
|
split_results = image_util.split(i)
|
|
|
|
split_results = image_util.split(ip)
|
|
|
|
for split_result in split_results:
|
|
|
|
for split_result in split_results:
|
|
|
|
to_mask_img, x_offset, y_offset = handle_image_for_mask(split_result)
|
|
|
|
to_mask_img, x_offset, y_offset = handle_image_for_mask(split_result)
|
|
|
|
results = get_mask_layout(to_mask_img, n, icn)
|
|
|
|
results = get_mask_layout(to_mask_img, n, icn)
|
|
|
|
@@ -193,27 +188,27 @@ def mask_photo(img_url, name, id_card_num, color=(255, 255, 255)):
|
|
|
|
result[3] + y_offset,
|
|
|
|
result[3] + y_offset,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
cv2.rectangle(i, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])), c, -1, 0)
|
|
|
|
cv2.rectangle(i, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])), c, -1, 0)
|
|
|
|
return do_mask, i
|
|
|
|
masked_path = image_util.get_save_path(f'{img_name}.mask.{img_ext}')
|
|
|
|
|
|
|
|
cv2.imwrite(masked_path, i)
|
|
|
|
|
|
|
|
return do_mask, masked_path
|
|
|
|
|
|
|
|
|
|
|
|
# 打开图片
|
|
|
|
original_image = img_path
|
|
|
|
image = image_util.read(img_url)
|
|
|
|
is_masked, img_path = _mask(img_path, name, id_card_num, color)
|
|
|
|
original_image = image
|
|
|
|
|
|
|
|
is_masked, image = _mask(image, name, id_card_num, color)
|
|
|
|
|
|
|
|
if not is_masked:
|
|
|
|
if not is_masked:
|
|
|
|
# 如果没有涂抹,可能是图片方向不对
|
|
|
|
# 如果没有涂抹,可能是图片方向不对
|
|
|
|
angles = image_util.parse_rotation_angles(image)
|
|
|
|
angles = model_util.clas_orientation(img_path)
|
|
|
|
angle = angles[0]
|
|
|
|
angle = angles[0]
|
|
|
|
if angle != "0":
|
|
|
|
if angle != "0":
|
|
|
|
image = image_util.rotate(image, int(angle))
|
|
|
|
img_path = image_util.rotate(img_path, int(angle))
|
|
|
|
is_masked, image = _mask(image, name, id_card_num, color)
|
|
|
|
is_masked, img_path = _mask(img_path, name, id_card_num, color)
|
|
|
|
if not is_masked:
|
|
|
|
if not is_masked:
|
|
|
|
# 如果旋转后也没有涂抹,恢复原来的方向
|
|
|
|
# 如果旋转后也没有涂抹,恢复原来的方向
|
|
|
|
image = original_image
|
|
|
|
img_path = original_image
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
# 如果旋转有效果,打一个日志
|
|
|
|
# 如果旋转有效果,打一个日志
|
|
|
|
logging.info(f"图片旋转了{angle}°")
|
|
|
|
logging.info(f"图片旋转了{angle}°")
|
|
|
|
|
|
|
|
|
|
|
|
return is_masked, image
|
|
|
|
return is_masked, img_path
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def photo_mask(pk_phhd, name, id_card_num):
|
|
|
|
def photo_mask(pk_phhd, name, id_card_num):
|
|
|
|
@@ -227,17 +222,14 @@ def photo_mask(pk_phhd, name, id_card_num):
|
|
|
|
img_url = ufile.get_private_url(phrec.cfjaddress)
|
|
|
|
img_url = ufile.get_private_url(phrec.cfjaddress)
|
|
|
|
if not img_url:
|
|
|
|
if not img_url:
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
|
|
|
|
img_path = image_util.save_to_local(img_url)
|
|
|
|
is_masked, image = mask_photo(img_url, name, id_card_num)
|
|
|
|
is_masked, image = mask_photo(img_path, name, id_card_num)
|
|
|
|
|
|
|
|
|
|
|
|
# 如果涂抹了要备份以及更新
|
|
|
|
# 如果涂抹了要备份以及更新
|
|
|
|
if is_masked:
|
|
|
|
if is_masked:
|
|
|
|
ufile.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress)
|
|
|
|
ufile.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress)
|
|
|
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
|
|
|
|
|
|
|
|
cv2.imwrite(temp_file.name, image)
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
ufile.upload_file(phrec.cfjaddress, temp_file.name)
|
|
|
|
ufile.upload_file(phrec.cfjaddress, image)
|
|
|
|
session = MysqlSession()
|
|
|
|
session = MysqlSession()
|
|
|
|
update_flag = (update(ZxPhrec).where(ZxPhrec.pk_phrec == phrec.pk_phrec).values(
|
|
|
|
update_flag = (update(ZxPhrec).where(ZxPhrec.pk_phrec == phrec.pk_phrec).values(
|
|
|
|
paint_user=HOSTNAME,
|
|
|
|
paint_user=HOSTNAME,
|
|
|
|
@@ -248,7 +240,7 @@ def photo_mask(pk_phhd, name, id_card_num):
|
|
|
|
except Exception as e:
|
|
|
|
except Exception as e:
|
|
|
|
logging.error("上传图片出错", exc_info=e)
|
|
|
|
logging.error("上传图片出错", exc_info=e)
|
|
|
|
finally:
|
|
|
|
finally:
|
|
|
|
common_util.delete_temp_file(temp_file.name)
|
|
|
|
common_util.delete_temp_file(image)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
def main():
|
|
|
|
|