import logging.config import re import tempfile from time import sleep import cv2 from sqlalchemy import update, and_ from db import MysqlSession from db.mysql import ZxPhrec, ZxPhhd from photo_mask import OCR, PHHD_BATCH_SIZE, SLEEP_MINUTES, NAME_KEYS, ID_CARD_NUM_KEYS, SIMILAR_CHAR, HOSTNAME from ucloud import BUCKET, ufile from util import image_util, util def find_boxes(content, layout, offset=0, length=None, improve=False, image_path=None, extra_content=None): full_box = layout[0] x_len = full_box[2] - full_box[0] y_len = full_box[3] - full_box[1] if x_len >= y_len: # 横向排布 box_len = x_len direction = "x" else: # 纵向排布 box_len = y_len direction = "y" text = layout[1] text_len = len(text) char_len = box_len / text_len index = text.index(content) if not length: length = len(content) + 1 if direction == "x": # 横向排布 box = [ full_box[0] + (index + offset) * char_len, full_box[1], full_box[0] + (index + offset + length) * char_len, full_box[3], ] else: # 纵向排布 box = [ full_box[0], full_box[1] + (index + offset) * char_len, full_box[2], full_box[1] + (index + offset + length) * char_len, ] boxes = [] if improve: # 再次识别,提高精度 image = cv2.imread(image_path) # 截图时偏大一点 capture_box = util.zoom_rectangle(box, 0.2) captured_image = image_util.capture(image, capture_box) with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file: captured_image, offset_x, offset_y = image_util.expand_to_a4_size(captured_image) cv2.imwrite(temp_file.name, captured_image) try: layouts = util.get_ocr_layout(OCR, temp_file.name) except TypeError: # 如果是类型错误,大概率是没识别到文字 layouts = [] except Exception as e: # 如果出现其他错误,抛出 raise e for layout in layouts: if extra_content: matches = re.findall(extra_content, layout[1]) else: matches = [content] for c in matches: if c in layout[1]: temp_box = find_boxes(c, layout)[0] if temp_box: boxes.append([ temp_box[0] + capture_box[0] - offset_x, temp_box[1] + capture_box[1] - offset_y, temp_box[2] + capture_box[0] - offset_x, temp_box[3] + capture_box[1] - offset_y, ]) break util.delete_temp_file(temp_file.name) if not boxes: boxes.append(box) return boxes def get_mask_layout(image, name, id_card_num): with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: cv2.imwrite(temp_file.name, image) result = [] try: try: layouts = util.get_ocr_layout(OCR, temp_file.name) # layouts = OCR.parse({"doc": temp_file.name})["layout"] except TypeError: # 如果是类型错误,大概率是没识别到文字 layouts = [] except Exception as e: # 如果出现其他错误,抛出 raise e if not layouts: # 无识别结果 return result else: re_list = [] for char in name: char_re = f"{char}" if char in SIMILAR_CHAR: char_re += "|" + "|".join(SIMILAR_CHAR[char]) re_list.append(char_re) name_len = len(name) name_offset = int(name_len / 2) if name_len > 2: r = f"[{']['.join(re_list[:-1])}].?|.?[{']['.join(re_list[1:])}]" else: r = f"[{']['.join(re_list)}]" if (name_len & 1) == 1: r += f"|[{']['.join(re_list[:name_offset])}].?[{']['.join(re_list[-name_offset:])}]" r += f"|名[:|:](?=.*[{'|'.join(re_list)}]).{{{name_len}}}" for layout in layouts: find_name_by_key = True find_id_card_num_by_key = True matches = re.findall(r, layout[1]) for match in matches: result += find_boxes(match, layout, improve=True, image_path=temp_file.name, extra_content=r) find_name_by_key = False break if id_card_num in layout[1]: result += find_boxes(id_card_num, layout, improve=True, image_path=temp_file.name) find_id_card_num_by_key = False def _find_boxes_by_keys(keys): boxes = [] for key in keys: match_list = re.findall(key["key"], layout[1]) for m in match_list: if m in layout[1]: boxes += find_boxes(m, layout, offset=key.get("offset", len(m)), length=key["length"]) if match_list: break return boxes if find_name_by_key: result += _find_boxes_by_keys(NAME_KEYS) if find_id_card_num_by_key: result += _find_boxes_by_keys(ID_CARD_NUM_KEYS) return result except Exception as e: logging.error("涂抹时出错!", exc_info=e) return result finally: util.delete_temp_file(temp_file.name) def handle_image_for_mask(split_result): expand_img, offset_x, offset_y = image_util.expand_to_a4_size(split_result["img"]) split_result["x_offset"] -= offset_x split_result["y_offset"] -= offset_y return expand_img, split_result["x_offset"], split_result["y_offset"] def mask_photo(img_url, name, id_card_num, color=(255, 255, 255)): def _mask(i, n, icn, c): do_mask = False split_results = image_util.split(i) for split_result in split_results: to_mask_img, x_offset, y_offset = handle_image_for_mask(split_result) results = get_mask_layout(to_mask_img, n, icn) if results: do_mask = True for result in results: result = ( result[0] + x_offset, result[1] + y_offset, result[2] + x_offset, result[3] + y_offset, ) cv2.rectangle(i, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])), c, -1, 0) return do_mask, i # 打开图片 image = image_util.read(img_url) original_image = image is_masked, image = _mask(image, name, id_card_num, color) if not is_masked: # 如果没有涂抹,可能是图片方向不对 angles = image_util.parse_rotation_angles(image) angle = angles[0] if angle != "0": image = image_util.rotate(image, int(angle)) is_masked, image = _mask(image, name, id_card_num, color) if not is_masked: # 如果旋转后也没有涂抹,恢复原来的方向 image = original_image else: # 如果旋转有效果,打一个日志 logging.info(f"图片旋转了{angle}°") return is_masked, image def photo_mask(pk_phhd, name, id_card_num): session = MysqlSession() phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.cfjaddress).filter(and_( ZxPhrec.pk_phhd == pk_phhd, ZxPhrec.cRectype.in_(["3", "4"]) )).all() session.close() for phrec in phrecs: img_url = ufile.get_private_url(phrec.cfjaddress) if not img_url: continue is_masked, image = mask_photo(img_url, name, id_card_num) # 如果涂抹了要备份以及更新 if is_masked: ufile.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress) with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: cv2.imwrite(temp_file.name, image) try: ufile.upload_file(phrec.cfjaddress, temp_file.name) session = MysqlSession() update_flag = (update(ZxPhrec).where(ZxPhrec.pk_phrec == phrec.pk_phrec).values( paint_user=HOSTNAME, paint_date=util.get_default_datetime())) session.execute(update_flag) session.commit() session.close() except Exception as e: logging.error("上传图片出错", exc_info=e) finally: util.delete_temp_file(temp_file.name) def main(): while 1: session = MysqlSession() phhds = (session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm, ZxPhhd.cSfzh) .join(ZxPhrec, ZxPhhd.pk_phhd == ZxPhrec.pk_phhd, isouter=True) .filter(ZxPhhd.paint_flag == "1") .filter(ZxPhrec.pk_phrec.isnot(None)) .distinct().limit(PHHD_BATCH_SIZE).all()) # 将状态改为正在涂抹中 pk_phhd_values = [phhd.pk_phhd for phhd in phhds] update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(paint_flag="2")) session.execute(update_flag) session.commit() session.close() if phhds: for phhd in phhds: pk_phhd = phhd.pk_phhd logging.info(f"开始涂抹:{pk_phhd}") photo_mask(pk_phhd, phhd.cXm, phhd.cSfzh) # 涂抹完成更新标识 session = MysqlSession() update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values( paint_flag="8", paint_user=HOSTNAME, paint_date=util.get_default_datetime())) session.execute(update_flag) session.commit() session.close() else: # 没有查询到新案子,等待一段时间后再查 logging.info(f"暂未查询到需要涂抹的案子,等待{SLEEP_MINUTES}分钟...") sleep(SLEEP_MINUTES * 60)