import logging.config import re import tempfile from time import sleep import cv2 from sqlalchemy import update, and_ from db import MysqlSession from db.mysql import ZxPhrec, ZxPhhd from photo_mask import OCR, PHHD_BATCH_SIZE, SLEEP_MINUTES, NAME_KEYS, ID_CARD_NUM_KEYS, SIMILAR_CHAR from ucloud import BUCKET, ufile from util import image_util, util def find_boxes(content, layout, offset=0, length=None, improve=False, image_path=None, extra_content=None): full_box = layout[0] x_len = full_box[2] - full_box[0] y_len = full_box[3] - full_box[1] if x_len >= y_len: # 横向排布 box_len = x_len direction = "x" else: # 纵向排布 box_len = y_len direction = "y" text = layout[1] text_len = len(text) char_len = box_len / text_len index = text.index(content) if not length: length = len(content) + 1 if direction == "x": # 横向排布 box = [ full_box[0] + (index + offset) * char_len, full_box[1], full_box[0] + (index + offset + length) * char_len, full_box[3], ] else: # 纵向排布 box = [ full_box[0], full_box[1] + (index + offset) * char_len, full_box[2], full_box[1] + (index + offset + length) * char_len, ] boxes = [] if improve: # 再次识别,提高精度 image = cv2.imread(image_path) # 截图时偏大一点 capture_box = util.zoom_rectangle(box, 0.2) captured_image = image_util.capture(image, capture_box) with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file: captured_image, offset_x, offset_y = image_util.expand_to_a4_size(captured_image, True) cv2.imwrite(temp_file.name, captured_image) try: layouts = util.get_ocr_layout(OCR, temp_file.name) except TypeError: # 如果是类型错误,大概率是没识别到文字 layouts = [] except Exception as e: # 如果出现其他错误,抛出 raise e if extra_content: extra_content.append(content) else: extra_content = [content] for layout in layouts: for c in extra_content: if c in layout[1]: temp_box = find_boxes(c, layout)[0] if temp_box: boxes.append([ temp_box[0] + capture_box[0] - offset_x, temp_box[1] + capture_box[1] - offset_y, temp_box[2] + capture_box[0] - offset_x, temp_box[3] + capture_box[1] - offset_y, ]) break util.delete_temp_file(temp_file.name) if not boxes: boxes.append(box) return boxes def get_mask_layout(image, name, id_card_num): with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: cv2.imwrite(temp_file.name, image) result = [] try: try: layouts = util.get_ocr_layout(OCR, temp_file.name) # layouts = OCR.parse({"doc": temp_file.name})["layout"] except TypeError: # 如果是类型错误,大概率是没识别到文字 layouts = [] except Exception as e: # 如果出现其他错误,抛出 raise e if not layouts: # 无识别结果 return result else: # 处理形近字 name_list = [name] # 移除名字中重复的字 unique_name = "".join(dict.fromkeys(name)) for char in unique_name: if char in SIMILAR_CHAR: for sc in SIMILAR_CHAR[char]: name_list.append(name.replace(char, sc)) name_len = len(name) name_offset = int(name_len / 2) r = f"{name[:-1]}.?|.?{name[1:]}" + ( f"|{name[:name_offset]}.?{name[-name_offset:]}" if (name_len & 1) == 1 else "") for layout in layouts: find_name_by_key = True find_id_card_num_by_key = True for name in name_list: if name in layout[1]: result += find_boxes(name, layout, improve=True, image_path=temp_file.name) find_name_by_key = False break if find_name_by_key and name_len > 2: matches = re.findall(r, layout[1]) for match in matches: if match in layout[1]: result += find_boxes(match, layout, improve=True, image_path=temp_file.name, extra_content=name_list) find_name_by_key = False break if id_card_num in layout[1]: result += find_boxes(id_card_num, layout, improve=True, image_path=temp_file.name) find_id_card_num_by_key = False def _find_boxes_by_keys(keys): boxes = [] for key in keys: match_list = re.findall(key["key"], layout[1]) for m in match_list: if m in layout[1]: boxes += find_boxes(m, layout, offset=key.get("offset", len(m)), length=key["length"]) if match_list: break return boxes if find_name_by_key: result += _find_boxes_by_keys(NAME_KEYS) if find_id_card_num_by_key: result += _find_boxes_by_keys(ID_CARD_NUM_KEYS) return result except Exception as e: logging.error("涂抹时出错!", exc_info=e) return result finally: util.delete_temp_file(temp_file.name) def handle_image_for_mask(split_result): expand_img, offset_x, offset_y = image_util.expand_to_a4_size(split_result["img"], True) split_result["x_offset"] -= offset_x split_result["y_offset"] -= offset_y return expand_img, split_result["x_offset"], split_result["y_offset"] def mask_photo(img_url, name, id_card_num, color=(255, 255, 255)): # 是否涂抹了 is_masked = False # 打开图片 image = image_util.read(img_url) # image = image_util.rotate(image, 180) split_results = image_util.split(image) for split_result in split_results: to_mask_img, x_offset, y_offset = handle_image_for_mask(split_result) results = get_mask_layout(to_mask_img, name, id_card_num) if results: is_masked = True for result in results: result = ( result[0] + x_offset, result[1] + y_offset, result[2] + x_offset, result[3] + y_offset, ) cv2.rectangle(image, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])), color, -1, 0) return is_masked, image def photo_mask(pk_phhd, name, id_card_num): session = MysqlSession() phrecs = session.query(ZxPhrec.cfjaddress).filter(and_( ZxPhrec.pk_phhd == pk_phhd, ZxPhrec.cRectype.in_(["3", "4"]) )).all() session.close() for phrec in phrecs: img_url = ufile.get_private_url(phrec.cfjaddress) if not img_url: continue is_masked, image = mask_photo(img_url, name, id_card_num) # 如果涂抹了要备份以及更新 if is_masked: ufile.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress) with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: cv2.imwrite(temp_file.name, image) try: ufile.upload_file(phrec.cfjaddress, temp_file.name) except Exception as e: logging.error("上传图片出错", exc_info=e) finally: util.delete_temp_file(temp_file.name) def main(): while 1: session = MysqlSession() phhds = session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm, ZxPhhd.cSfzh).filter( ZxPhhd.paint_flag == "1" ).limit(PHHD_BATCH_SIZE).all() # 将状态改为正在涂抹中 pk_phhd_values = [phhd.pk_phhd for phhd in phhds] update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(paint_flag="2")) session.execute(update_flag) session.commit() session.close() if phhds: for phhd in phhds: pk_phhd = phhd.pk_phhd logging.info(f"开始涂抹:{pk_phhd}") photo_mask(pk_phhd, phhd.cXm, phhd.cSfzh) # 涂抹完成更新标识 session = MysqlSession() update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values( paint_flag="8", paint_date=util.get_default_datetime())) session.execute(update_flag) session.commit() session.close() else: # 没有查询到新案子,等待一段时间后再查 logging.info(f"暂未查询到需要涂抹的案子,等待{SLEEP_MINUTES}分钟...") sleep(SLEEP_MINUTES * 60)