import logging.config import tempfile from time import sleep import cv2 from sqlalchemy import update, and_ from db import MysqlSession from db.mysql import ZxPhrec, ZxPhhd from photo_mask import OCR, PHHD_BATCH_SIZE, SLEEP_MINUTES, COPY_TRY_TIMES, UPLOAD_TRY_TIMES, NAME_KEYS, \ ID_CARD_NUM_KEYS from ucloud import BUCKET, ucloud from util import image_util, util def find_box(content, layout, offset=0, length=None, improve=False, image_path=None): full_box = layout[0] x_len = full_box[2] - full_box[0] y_len = full_box[3] - full_box[1] if x_len >= y_len: # 横向排布 box_len = x_len direction = "x" else: # 纵向排布 box_len = y_len direction = "y" text = layout[1] text_len = len(text) char_len = box_len / text_len index = text.index(content) if not length: length = len(content) + 1 if direction == "x": # 横向排布 box = [ full_box[0] + (index + offset) * char_len, full_box[1], full_box[0] + (index + offset + length) * char_len, full_box[3], ] else: # 纵向排布 box = [ full_box[0], full_box[1] + (index + offset) * char_len, full_box[2], full_box[1] + (index + offset + length) * char_len, ] if improve: # 再次识别,提高精度 image = cv2.imread(image_path) # 截图时偏大一点 capture_box = util.zoom_rectangle(box, 0.2) captured_image = image_util.capture(image, capture_box) with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file: captured_image, offset_x, offset_y = image_util.expand_to_a4_size(captured_image, True) cv2.imwrite(temp_file.name, captured_image) try: layouts = OCR.parse({"doc": temp_file.name})["layout"] except TypeError: # 如果是类型错误,大概率是没识别到文字 layouts = [] except Exception as e: # 如果出现其他错误,抛出 raise e for layout in layouts: if content in layout[1]: temp_box = find_box(content, layout) if temp_box: box = [ temp_box[0] + capture_box[0] - offset_x, temp_box[1] + capture_box[1] - offset_y, temp_box[2] + capture_box[0] - offset_x, temp_box[3] + capture_box[1] - offset_y, ] break util.delete_temp_file(temp_file.name) return box def get_mask_layout(image, name, id_card_num): with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: cv2.imwrite(temp_file.name, image) result = [] try: try: # layouts = util.get_ocr_layout(OCR, temp_file.name) layouts = OCR.parse({"doc": temp_file.name})["layout"] except TypeError: # 如果是类型错误,大概率是没识别到文字 layouts = [] except Exception as e: # 如果出现其他错误,抛出 raise e if not layouts: # 无识别结果 return result else: # 涂抹 for layout in layouts: find_name_by_key = True find_id_card_num_by_key = True if name in layout[1]: result.append(find_box(name, layout, improve=True, image_path=temp_file.name)) find_name_by_key = False if id_card_num in layout[1]: result.append(find_box(id_card_num, layout, improve=True, image_path=temp_file.name)) find_id_card_num_by_key = False keys = [] if find_name_by_key: keys += NAME_KEYS if find_id_card_num_by_key: keys += ID_CARD_NUM_KEYS for key in keys: if key["key"] in layout[1]: result.append(find_box(key["key"], layout, offset=len(key["key"]), length=key["length"])) return result except Exception as e: logging.error("涂抹时出错!", exc_info=e) finally: util.delete_temp_file(temp_file.name) def handle_image_for_mask(split_result): expand_img, offset_x, offset_y = image_util.expand_to_a4_size(split_result["img"], True) split_result["x_offset"] -= offset_x split_result["y_offset"] -= offset_y gray_image = cv2.cvtColor(expand_img, cv2.COLOR_BGR2GRAY) return gray_image, split_result["x_offset"], split_result["y_offset"] def photo_mask(pk_phhd, name, id_card_num): session = MysqlSession() phrecs = session.query(ZxPhrec.cfjaddress).filter(and_( ZxPhrec.pk_phhd == pk_phhd, ZxPhrec.cRectype.in_(["3", "4"]) )).all() session.close() for phrec in phrecs: img_url = ucloud.get_private_url(phrec.cfjaddress) if not img_url: continue # 是否有涂抹 is_masked = False # 打开图片 image = image_util.read(img_url) split_results = image_util.split(image) for split_result in split_results: to_mask_img, x_offset, y_offset = handle_image_for_mask(split_result) results = get_mask_layout(to_mask_img, name, id_card_num) if results: is_masked = True for result in results: result = ( result[0] + x_offset, result[1] + y_offset, result[2] + x_offset, result[3] + y_offset, ) cv2.rectangle(image, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])), (255, 255, 255), -1, 0) # 如果涂抹了要备份以及更新 if is_masked: for i in range(COPY_TRY_TIMES): is_copy_success = ucloud.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress) if is_copy_success: break with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: cv2.imwrite(temp_file.name, image) try: for i in range(UPLOAD_TRY_TIMES): is_upload_success = ucloud.upload_file(phrec.cfjaddress, temp_file.name) if is_upload_success: break except Exception as e: logging.error("上传图片出错", exc_info=e) finally: util.delete_temp_file(temp_file.name) def main(): while 1: session = MysqlSession() phhds = session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm, ZxPhhd.cSfzh).filter( ZxPhhd.paint_flag == "1" ).limit(PHHD_BATCH_SIZE).all() # 将状态改为正在涂抹中 pk_phhd_values = [phhd.pk_phhd for phhd in phhds] update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(paint_flag="2")) session.execute(update_flag) session.commit() session.close() if phhds: for phhd in phhds: pk_phhd = phhd.pk_phhd logging.info(f"开始涂抹:{pk_phhd}") photo_mask(pk_phhd, phhd.cXm, phhd.cSfzh) # 涂抹完成更新标识 session = MysqlSession() update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values( paint_flag="8", paint_date=util.get_default_datetime())) session.execute(update_flag) session.commit() session.close() else: # 没有查询到新案子,等待一段时间后再查 logging.info(f"暂未查询到需要涂抹的案子,等待{SLEEP_MINUTES}分钟...") sleep(SLEEP_MINUTES * 60)