From 13575fb2aad1fd8599196db0b7d7b3a05a581269 Mon Sep 17 00:00:00 2001 From: liuyebo <1515783401@qq.com> Date: Mon, 15 Jul 2024 15:11:13 +0800 Subject: [PATCH] =?UTF-8?q?Revert=20"=E4=BC=98=E5=8C=96=E7=85=A7=E7=89=87?= =?UTF-8?q?=E6=B6=82=E6=8A=B9=E5=8A=9F=E8=83=BD=E6=9E=B6=E6=9E=84"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit a0997e0673b82ff8181ddd070535c54f87470b13. --- photo_mask.py | 443 ++++++++++++++++++++++++++++++++++++++- photo_mask/__init__.py | 28 --- photo_mask/photo_mask.py | 224 -------------------- requirements.txt | 4 +- util/image_util.py | 202 ------------------ util/util.py | 72 ------- 6 files changed, 438 insertions(+), 535 deletions(-) delete mode 100644 photo_mask/__init__.py delete mode 100644 photo_mask/photo_mask.py delete mode 100644 util/image_util.py diff --git a/photo_mask.py b/photo_mask.py index d104ff0..013f252 100644 --- a/photo_mask.py +++ b/photo_mask.py @@ -1,17 +1,448 @@ import logging.config +import math +import os +import tempfile import traceback +import urllib.request +from time import sleep + +import cv2 +import numpy as np +import paddleclas +from paddlenlp.utils.doc_parser import DocParser +from sqlalchemy import update from auto_email.error_email import send_error_email +from db import MysqlSession +from db.mysql import ZxPhrec, ZxPhhd from log import LOGGING_CONFIG -from photo_mask import photo_mask, SEND_ERROR_EMAIL +from photo_review import PHHD_BATCH_SIZE, SLEEP_MINUTES +from ucloud import BUCKET, ucloud + +DOC_PARSER = DocParser(use_gpu=True, device_id=1) + + +def open_image(img_path): + if img_path.startswith("http"): + # 发送HTTP请求并获取图像数据 + resp = urllib.request.urlopen(img_path) + # 将数据读取为字节流 + image_data = resp.read() + # 将字节流转换为NumPy数组 + image_np = np.frombuffer(image_data, np.uint8) + # 解码NumPy数组为OpenCV图像格式 + image = cv2.imdecode(image_np, cv2.IMREAD_COLOR) + else: + image = cv2.imread(img_path) + return image + + +def split_image(img, max_ratio=1.41, best_ration=1.41, overlap=0.05): + split_result = [] + # 获取图片的宽度和高度 + height, width = img.shape[:2] + # 计算宽高比 + ratio = max(width, height) / min(width, height) + # 检查是否需要裁剪 + if ratio > max_ratio: + # 确定裁剪的尺寸,保持长宽比,以较短边为基准 + new_ratio = best_ration - overlap + if width < height: + # 高度是较长边 + cropped_width = width * best_ration + for i in range(math.ceil(height / (width * new_ratio))): + offset = round(width * new_ratio * i) + # 参数形式为[y1:y2, x1:x2] + cropped_img = img[offset:round(offset + cropped_width), 0:width] + split_result.append({"img": cropped_img, "x_offset": 0, "y_offset": offset}) + # 最后一次裁剪时不足的部分填充黑色 + last_img = split_result[-1]["img"] + split_result[-1]["img"] = cv2.copyMakeBorder(last_img, 0, round(cropped_width - last_img.shape[0]), 0, 0, + cv2.BORDER_CONSTANT, value=(0, 0, 0)) + else: + # 宽度是较长边 + cropped_height = height * best_ration + for i in range(math.ceil(width / (height * new_ratio))): + offset = round(height * new_ratio * i) + cropped_img = img[0:height, offset:round(offset + cropped_height)] + split_result.append({"img": cropped_img, "x_offset": offset, "y_offset": 0}) + # 最后一次裁剪时不足的部分填充黑色 + last_img = split_result[-1]["img"] + split_result[-1]["img"] = cv2.copyMakeBorder(last_img, 0, 0, 0, round(cropped_height - last_img.shape[1]), + cv2.BORDER_CONSTANT, value=(0, 0, 0)) + else: + split_result.append({"img": img, "x_offset": 0, "y_offset": 0}) + return split_result + + +def capture_image(img, layout): + x1, y1, x2, y2 = layout + return img[int(y1):int(y2), int(x1):int(x2)] + + +# 获取图片旋转角度 +def get_image_rotation_angles(img): + angles = ['0', '90'] + model = paddleclas.PaddleClas(model_name="text_image_orientation") + result = model.predict(input_data=img) + try: + result = next(result)[0] + if result["scores"][0] < 0.5: + return angles + angles = result["label_names"] + except Exception as e: + logging.error("获取图片旋转角度失败", exc_info=e) + return angles + + +def rotate_image(img, angle): + if angle == 0: + return img + height, width, _ = img.shape + if angle == 180: + new_width = width + new_height = height + else: + new_width = height + new_height = width + # 绕图像的中心旋转 + # 参数:旋转中心 旋转度数 scale + matrix = cv2.getRotationMatrix2D((width / 2, height / 2), angle, 1) + # 旋转后平移 + matrix[0, 2] += (new_width - width) / 2 + matrix[1, 2] += (new_height - height) / 2 + # 参数:原始图像 旋转参数 元素图像宽高 + rotated = cv2.warpAffine(img, matrix, (new_width, new_height)) + return rotated + + +def rotate_rectangle(rectangle, center, angle): + def rotate_point(pt, angle, center): + matrix = cv2.getRotationMatrix2D(center, angle, 1) + if angle != 180: + # 旋转后平移 + matrix[0, 2] += center[1] - center[0] + matrix[1, 2] += center[0] - center[1] + + reverse_matrix = cv2.invertAffineTransform(matrix) + + pt = np.array([[pt[0]], [pt[1]], [1]]) + return np.dot(reverse_matrix, pt) + + if angle == 0: + return list(rectangle) + + x1, y1, x2, y2 = rectangle + + # 计算矩形的四个顶点 + top_left = (x1, y1) + bot_left = (x1, y2) + top_right = (x2, y1) + bot_right = (x2, y2) + + # 旋转矩形的四个顶点 + rot_top_left = rotate_point(top_left, angle, center).astype(int) + rot_bot_left = rotate_point(bot_left, angle, center).astype(int) + rot_bot_right = rotate_point(bot_right, angle, center).astype(int) + rot_top_right = rotate_point(top_right, angle, center).astype(int) + + # 找出旋转后矩形的新左上角和右下角坐标 + new_top_left = (min(rot_top_left[0], rot_bot_left[0], rot_bot_right[0], rot_top_right[0]), + min(rot_top_left[1], rot_bot_left[1], rot_bot_right[1], rot_top_right[1])) + new_bot_right = (max(rot_top_left[0], rot_bot_left[0], rot_bot_right[0], rot_top_right[0]), + max(rot_top_left[1], rot_bot_left[1], rot_bot_right[1], rot_top_right[1])) + + return [new_top_left[0], new_top_left[1], new_bot_right[0], new_bot_right[1]] + + +def get_ocr_layout(ocr, img_path): + def _get_box(old_box): + new_box = [ + min(old_box[0][0], old_box[3][0]), # x1 + min(old_box[0][1], old_box[1][1]), # y1 + max(old_box[1][0], old_box[2][0]), # x2 + max(old_box[2][1], old_box[3][1]), # y2 + ] + return new_box + + def _normal_box(box_data): + # Ensure the height and width of bbox are greater than zero + if box_data[3] - box_data[1] < 0 or box_data[2] - box_data[0] < 0: + return False + return True + + layout = [] + ocr_result = ocr.ocr(img_path, cls=False) + ocr_result = ocr_result[0] + if not ocr_result: + return layout + for segment in ocr_result: + box = segment[0] + box = _get_box(box) + if not _normal_box(box): + continue + text = segment[1][0] + layout.append((box, text)) + return layout + + +def zoom_box(box, ratio): + x1, y1, x2, y2 = box + x1 = round(x1 - x1 * ratio) + y1 = round(y1 - y1 * ratio) + x2 = round(x2 + x2 * ratio) + y2 = round(y2 + y2 * ratio) + return [x1, y1, x2, y2] + + +def find_box_of_content(content, layout, img_path, improve=True): + full_box = layout[0] + x_len = full_box[2] - full_box[0] + y_len = full_box[3] - full_box[1] + if x_len >= y_len: + # 横向排布 + box_len = x_len + direction = "x" + else: + # 纵向排布 + box_len = y_len + direction = "y" + text = layout[1] + text_len = len(text) + char_len = box_len / text_len + index = text.index(content) + + if direction == "x": + # 横向排布 + box = [ + full_box[0] + index * char_len, + full_box[1], + full_box[0] + (index + len(content) + 1) * char_len, + full_box[3], + ] + is_abnormal = box[2] - box[0] < (box[3] - box[1]) * len(content) / 2 + else: + # 纵向排布 + box = [ + full_box[0], + full_box[1] + index * char_len, + full_box[2], + full_box[1] + (index + len(content) + 1) * char_len, + ] + is_abnormal = box[3] - box[1] < (box[2] - box[0]) * len(content) / 2 + + if is_abnormal and improve: + # 比例异常,再次识别 + image = cv2.imread(img_path) + # 截图时偏大一点 + capture_box = zoom_box(box, 0.2) + captured_image = capture_image(image, capture_box) + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file: + cv2.imwrite(temp_file.name, captured_image) + try: + layouts = DOC_PARSER.parse({"doc": temp_file.name})["layout"] + except TypeError: + # 如果是类型错误,大概率是没识别到文字 + layouts = [] + except Exception as e: + # 如果出现其他错误,抛出 + raise e + for layout in layouts: + if content in layout[1]: + temp_box = find_box_of_content(content, layout, temp_file.name, False) + if temp_box: + box = [ + temp_box[0] + capture_box[0], + temp_box[1] + capture_box[1], + temp_box[2] + capture_box[0], + temp_box[3] + capture_box[1], + ] + break + try: + os.remove(temp_file.name) + except Exception as e: + logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e) + return box + + +def find_box_of_value(key, layout, length): + full_box = layout[0] + x_len = full_box[2] - full_box[0] + y_len = full_box[3] - full_box[1] + if x_len >= y_len: + # 横向排布 + box_len = x_len + direction = "x" + else: + # 纵向排布 + box_len = y_len + direction = "y" + text = layout[1] + text_len = len(text) + char_len = box_len / text_len + index = text.index(key) + + if direction == "x": + # 横向排布 + return ( + full_box[0] + (index + len(key)) * char_len, + full_box[1], + full_box[0] + (index + len(key) + length) * char_len, + full_box[3], + ) + else: + # 纵向排布 + return ( + full_box[0], + full_box[1] + (index + len(key)) * char_len, + full_box[2], + full_box[1] + (index + len(key) + length) * char_len, + ) + + +def get_mask_layout(image, contents): + with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: + cv2.imwrite(temp_file.name, image) + + result = [] + try: + # layouts = get_ocr_layout(OCR, temp_file.name) + try: + layouts = DOC_PARSER.parse({"doc": temp_file.name}, True)["layout"] + except TypeError as te: + # 如果是类型错误,大概率是没识别到文字 + layouts = [] + except Exception as e: + # 如果出现其他错误,抛出 + raise e + if not layouts: + # 无识别结果 + return result + else: + # 涂抹 + for layout in layouts: + for content in contents: + if content in layout[1]: + result.append(find_box_of_content(content, layout, temp_file.name)) + if "姓名" in layout[1]: + result.append(find_box_of_value("姓名", layout, 4)) + if "交款人" in layout[1]: + result.append(find_box_of_value("交款人", layout, 4)) + if "文款人" in layout[1]: + result.append(find_box_of_value("文款人", layout, 4)) + if "购买方名称" in layout[1]: + result.append(find_box_of_value("购买方名称", layout, 4)) + if "身份证号" in layout[1]: + result.append(find_box_of_value("身份证号", layout, 19)) + return result + except Exception as e: + logging.error("涂抹时出错", exc_info=e) + finally: + try: + os.remove(temp_file.name) + except Exception as e: + logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e) + + +def photo_mask(pk_phhd, contents): + session = MysqlSession() + phrecs = session.query(ZxPhrec.pk_phrec, ZxPhrec.pk_phhd, ZxPhrec.cfjaddress) \ + .filter(ZxPhrec.pk_phhd == pk_phhd) \ + .filter(ZxPhrec.cRectype.in_(["3", "4"])) \ + .all() + session.close() + for phrec in phrecs: + img_url = ucloud.get_private_url(phrec.cfjaddress) + if not img_url: + continue + # 是否有涂抹 + is_masked = False + # 打开图片 + image = open_image(img_url) + split_result = split_image(image) + for img in split_result: + angles = get_image_rotation_angles(img["img"]) + angle = int(angles[0]) + rotated_img = rotate_image(img["img"], angle) + results = get_mask_layout(rotated_img, contents) + if not results: + angle = int(angles[1]) + rotated_img = rotate_image(img["img"], angle) + results = get_mask_layout(rotated_img, contents) + if not results and "0" not in angles: + angle = 0 + results = get_mask_layout(img["img"], contents) + + if results: + is_masked = True + + for result in results: + height, width = img["img"].shape[:2] + center = (width / 2, height / 2) + result = rotate_rectangle(result, center, angle) + result = ( + result[0] + img["x_offset"], + result[1] + img["y_offset"], + result[2] + img["x_offset"], + result[3] + img["y_offset"], + ) + cv2.rectangle(image, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])), + (255, 255, 255), -1, 0) + + # 如果涂抹了要备份以及更新 + if is_masked: + for i in range(3): + is_copy_success = ucloud.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress) + if is_copy_success: + break + + with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: + cv2.imwrite(temp_file.name, image) + try: + for i in range(3): + is_upload_success = ucloud.upload_file(phrec.cfjaddress, temp_file.name) + if is_upload_success: + break + except Exception as e: + logging.error("上传图片出错", exc_info=e) + finally: + try: + os.remove(temp_file.name) + except Exception as e: + logging.info(f"删除临时文件 {temp_file.name} 时出错", exc_info=e) + if __name__ == '__main__': - program_name = "自动照片涂抹脱敏脚本" logging.config.dictConfig(LOGGING_CONFIG) try: - logging.info(f"【{program_name}】开始运行") - photo_mask.main() + while 1: + session = MysqlSession() + phhds = session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm, ZxPhhd.cSfzh).filter( + ZxPhhd.paint_flag == "1" + ).limit(PHHD_BATCH_SIZE).all() + # 将状态改为正在涂抹中 + pk_phhd_values = [phhd.pk_phhd for phhd in phhds] + update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(paint_flag="2")) + session.execute(update_flag) + session.commit() + session.close() + if phhds: + for phhd in phhds: + pk_phhd = phhd.pk_phhd + logging.info(f"开始涂抹:{pk_phhd}") + photo_mask(pk_phhd, [phhd.cXm, phhd.cSfzh]) + + # 识别完成更新标识 + session = MysqlSession() + update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(paint_flag="8")) + session.execute(update_flag) + session.commit() + session.close() + else: + # 没有查询到新案子,等待一段时间后再查 + log = logging.getLogger() + log.info(f"暂未查询到需要涂抹的案子,等待{SLEEP_MINUTES}分钟...") + sleep(SLEEP_MINUTES * 60) except Exception as e: logging.error(traceback.format_exc()) - if SEND_ERROR_EMAIL: - send_error_email(program_name=program_name, error_name=repr(e), error_detail=traceback.format_exc()) + send_error_email(program_name='照片涂抹脚本', error_name=repr(e), error_detail=traceback.format_exc()) diff --git a/photo_mask/__init__.py b/photo_mask/__init__.py deleted file mode 100644 index 0e67900..0000000 --- a/photo_mask/__init__.py +++ /dev/null @@ -1,28 +0,0 @@ -from paddleocr import PaddleOCR - -""" -项目配置 -""" -# 每次从数据库获取的案子数量 -PHHD_BATCH_SIZE = 20 -# 没有查询到案子的等待时间(分钟) -SLEEP_MINUTES = 5 -# 是否发送异常提醒邮件 -SEND_ERROR_EMAIL = True -# 备份原图的尝试次数 -COPY_TRY_TIMES = 3 -# 上传新图的尝试次数 -UPLOAD_TRY_TIMES = 3 - -""" -关键词配置 -""" -NAME_KEYS = [ - {"key": "姓名", "length": 4}, - {"key": "交款人", "length": 4}, - {"key": "文款人", "length": 4}, - {"key": "购买方名称", "length": 4}, -] -ID_CARD_NUM_KEYS = [{"key": "身份证号", "length": 19}, ] - -OCR = PaddleOCR(use_angle_cls=False, show_log=False, gpu_id=1) diff --git a/photo_mask/photo_mask.py b/photo_mask/photo_mask.py deleted file mode 100644 index 63e1a3b..0000000 --- a/photo_mask/photo_mask.py +++ /dev/null @@ -1,224 +0,0 @@ -import logging.config -import tempfile -from time import sleep - -import cv2 -from sqlalchemy import update, and_ - -from db import MysqlSession -from db.mysql import ZxPhrec, ZxPhhd -from photo_mask import OCR, PHHD_BATCH_SIZE, SLEEP_MINUTES, COPY_TRY_TIMES, UPLOAD_TRY_TIMES, NAME_KEYS, \ - ID_CARD_NUM_KEYS -from ucloud import BUCKET, ucloud -from util import image_util, util - - -def find_box(content, layout, offset=0, length=None, improve=False, image_path=None): - full_box = layout[0] - x_len = full_box[2] - full_box[0] - y_len = full_box[3] - full_box[1] - if x_len >= y_len: - # 横向排布 - box_len = x_len - direction = "x" - else: - # 纵向排布 - box_len = y_len - direction = "y" - text = layout[1] - text_len = len(text) - char_len = box_len / text_len - index = text.index(content) - - if not length: - length = len(content) + 1 - if direction == "x": - # 横向排布 - box = [ - full_box[0] + (index + offset) * char_len, - full_box[1], - full_box[0] + (index + offset + length) * char_len, - full_box[3], - ] - else: - # 纵向排布 - box = [ - full_box[0], - full_box[1] + (index + offset) * char_len, - full_box[2], - full_box[1] + (index + offset + length) * char_len, - ] - - if improve: - # 再次识别,提高精度 - image = cv2.imread(image_path) - # 截图时偏大一点 - capture_box = util.zoom_rectangle(box, 0.2) - captured_image = image_util.capture(image, capture_box) - with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file: - captured_image, offset_x, offset_y = image_util.expand_to_a4_size(captured_image, True) - cv2.imwrite(temp_file.name, captured_image) - try: - layouts = util.get_ocr_layout(OCR, temp_file.name) - except TypeError: - # 如果是类型错误,大概率是没识别到文字 - layouts = [] - except Exception as e: - # 如果出现其他错误,抛出 - raise e - for layout in layouts: - if content in layout[1]: - temp_box = find_box(content, layout) - if temp_box: - box = [ - temp_box[0] + capture_box[0] - offset_x, - temp_box[1] + capture_box[1] - offset_y, - temp_box[2] + capture_box[0] - offset_x, - temp_box[3] + capture_box[1] - offset_y, - ] - break - util.delete_temp_file(temp_file.name) - return box - - -def get_mask_layout(image, name, id_card_num): - with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: - cv2.imwrite(temp_file.name, image) - - result = [] - try: - try: - layouts = util.get_ocr_layout(OCR, temp_file.name) - except TypeError: - # 如果是类型错误,大概率是没识别到文字 - layouts = [] - except Exception as e: - # 如果出现其他错误,抛出 - raise e - - if not layouts: - # 无识别结果 - return result - else: - # 涂抹 - for layout in layouts: - find_name_by_key = True - find_id_card_num_by_key = True - if name in layout[1]: - result.append(find_box(name, layout, improve=True, image_path=temp_file.name)) - find_name_by_key = False - if id_card_num in layout[1]: - result.append(find_box(id_card_num, layout, improve=True, image_path=temp_file.name)) - find_id_card_num_by_key = False - - keys = [] - if find_name_by_key: - keys += NAME_KEYS - if find_id_card_num_by_key: - keys += ID_CARD_NUM_KEYS - for key in keys: - if key["key"] in layout[1]: - result.append(find_box(key["key"], layout, offset=len(key["key"]), length=key["length"])) - return result - except Exception as e: - logging.error("涂抹时出错!", exc_info=e) - finally: - util.delete_temp_file(temp_file.name) - - -def photo_mask(pk_phhd, name, id_card_num): - session = MysqlSession() - phrecs = session.query(ZxPhrec.cfjaddress).filter(and_( - ZxPhrec.pk_phhd == pk_phhd, - ZxPhrec.cRectype.in_(["3", "4"]) - )).all() - session.close() - for phrec in phrecs: - img_url = ucloud.get_private_url(phrec.cfjaddress) - if not img_url: - continue - # 是否有涂抹 - is_masked = False - # 打开图片 - image = image_util.read(img_url) - split_results = image_util.split(image) - for split_result in split_results: - angles = image_util.parse_rotation_angles(split_result["img"]) - angle = int(angles[0]) - rotated_img = image_util.rotate(split_result["img"], angle) - rotated_img, offset_x, offset_y = image_util.expand_to_a4_size(rotated_img, True) - split_result["x_offset"] -= offset_x - split_result["y_offset"] -= offset_y - results = get_mask_layout(rotated_img, name, id_card_num) - if not results: - angle = int(angles[1]) - rotated_img = image_util.rotate(split_result["img"], angle) - results = get_mask_layout(rotated_img, name, id_card_num) - if not results and "0" not in angles: - angle = 0 - results = get_mask_layout(split_result["img"], name, id_card_num) - - if results: - is_masked = True - - for result in results: - height, width = split_result["img"].shape[:2] - center = (width / 2, height / 2) - result = image_util.invert_rotate_rectangle(result, center, angle) - result = ( - result[0] + split_result["x_offset"], - result[1] + split_result["y_offset"], - result[2] + split_result["x_offset"], - result[3] + split_result["y_offset"], - ) - cv2.rectangle(image, (int(result[0]), int(result[1])), (int(result[2]), int(result[3])), - (255, 255, 255), -1, 0) - - # 如果涂抹了要备份以及更新 - if is_masked: - for i in range(COPY_TRY_TIMES): - is_copy_success = ucloud.copy_file(BUCKET, phrec.cfjaddress, "drg2015", phrec.cfjaddress) - if is_copy_success: - break - - with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: - cv2.imwrite(temp_file.name, image) - try: - for i in range(UPLOAD_TRY_TIMES): - is_upload_success = ucloud.upload_file(phrec.cfjaddress, temp_file.name) - if is_upload_success: - break - except Exception as e: - logging.error("上传图片出错", exc_info=e) - finally: - util.delete_temp_file(temp_file.name) - - -def main(): - while 1: - session = MysqlSession() - phhds = session.query(ZxPhhd.pk_phhd, ZxPhhd.cXm, ZxPhhd.cSfzh).filter( - ZxPhhd.paint_flag == "1" - ).limit(PHHD_BATCH_SIZE).all() - # 将状态改为正在涂抹中 - pk_phhd_values = [phhd.pk_phhd for phhd in phhds] - update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd.in_(pk_phhd_values)).values(paint_flag="2")) - session.execute(update_flag) - session.commit() - session.close() - if phhds: - for phhd in phhds: - pk_phhd = phhd.pk_phhd - logging.info(f"开始涂抹:{pk_phhd}") - photo_mask(pk_phhd, phhd.cXm, phhd.cSfzh) - - # 识别完成更新标识 - session = MysqlSession() - update_flag = (update(ZxPhhd).where(ZxPhhd.pk_phhd == pk_phhd).values(paint_flag="8")) - session.execute(update_flag) - session.commit() - session.close() - else: - # 没有查询到新案子,等待一段时间后再查 - logging.info(f"暂未查询到需要涂抹的案子,等待{SLEEP_MINUTES}分钟...") - sleep(SLEEP_MINUTES * 60) diff --git a/requirements.txt b/requirements.txt index a8a72e4..e83d8f6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,4 @@ paddleocr==2.7.3 pymysql==1.1.0 sqlacodegen==2.3.0.post1 sqlalchemy==1.4.52 -ufile==3.2.9 -opencv-python==4.6.0.66 -numpy==1.26.2 \ No newline at end of file +ufile==3.2.9 \ No newline at end of file diff --git a/util/image_util.py b/util/image_util.py deleted file mode 100644 index 45f13bc..0000000 --- a/util/image_util.py +++ /dev/null @@ -1,202 +0,0 @@ -import logging -import math -import urllib.request - -import cv2 -import numpy -from paddleclas import PaddleClas - - -def read(image_path): - """ - 从网络或本地读取图片 - :param image_path: 网络或本地路径 - :return: NumPy数组形式的图片 - """ - if image_path.startswith("http"): - # 发送HTTP请求并获取图像数据 - resp = urllib.request.urlopen(image_path) - # 将数据读取为字节流 - image_data = resp.read() - # 将字节流转换为NumPy数组 - image_np = numpy.frombuffer(image_data, numpy.uint8) - # 解码NumPy数组为OpenCV图像格式 - image = cv2.imdecode(image_np, cv2.IMREAD_COLOR) - else: - image = cv2.imread(image_path) - return image - - -def capture(image, rectangle): - """ - 截取图片 - :param image: 图片NumPy数组 - :param rectangle: 要截取的矩形 - :return: 截取之后的图片NumPy - """ - x1, y1, x2, y2 = rectangle - return image[int(y1):int(y2), int(x1):int(x2)] - - -def split(image, ratio=1.414, overlap=0.05): - """ - 分割图片,只分割过长的图片,暂不处理过宽的图片 - :param image:图片,可以是NumPy数组或文件路径 - :param ratio: 分割后的比例 - :param overlap: 图片之间的覆盖比例 - :return: 分割后的图片组(NumPy数组形式) - """ - split_result = [] - if isinstance(image, str): - image = read(image) - # 获取图片的宽度和高度 - height, width = image.shape[:2] - # 计算宽高比 - img_ratio = height / width - # 检查是否需要裁剪 - if img_ratio > ratio: - split_ratio = ratio - overlap - # 分割后的高度 - new_img_height = width * ratio - for i in range(math.ceil(height / (width * split_ratio))): - offset = round(width * split_ratio * i) - # 参数形式为[y1:y2, x1:x2] - cropped_img = capture(image, [0, offset, width, offset + new_img_height]) - split_result.append({"img": cropped_img, "x_offset": 0, "y_offset": offset}) - else: - split_result.append({"img": image, "x_offset": 0, "y_offset": 0}) - return split_result - - -def parse_rotation_angles(image): - """ - 判断图片旋转角度,逆时针旋转该角度后为正。可能值["0", "90", "180", "270"] - :param image: 图片NumPy数组或文件路径 - :return: 最有可能的两个角度 - """ - angles = ['0', '90'] - model = PaddleClas(model_name="text_image_orientation") - clas_result = model.predict(input_data=image) - try: - clas_result = next(clas_result)[0] - if clas_result["scores"][0] < 0.5: - return angles - angles = clas_result["label_names"] - except Exception as e: - logging.error("获取图片旋转角度失败", exc_info=e) - return angles - - -def rotate(image, angle): - """ - 旋转图片 - :param image: 图片NumPy数组 - :param angle: 逆时针旋转角度 - :return: 旋转后的图片NumPy数组 - """ - if angle == 0: - return image - height, width = image.shape[:2] - if angle == 180: - new_width = width - new_height = height - else: - new_width = height - new_height = width - # 绕图像的中心旋转 - # 参数:旋转中心 旋转度数 scale - matrix = cv2.getRotationMatrix2D((width / 2, height / 2), angle, 1) - # 旋转后平移 - matrix[0, 2] += (new_width - width) / 2 - matrix[1, 2] += (new_height - height) / 2 - # 参数:原始图像 旋转参数 元素图像宽高 - rotated = cv2.warpAffine(image, matrix, (new_width, new_height)) - return rotated - - -def invert_rotate_point(point, center, angle): - """ - 反向旋转图片上的点 - :param point: 点 - :param center: 旋转中心 - :param angle: 旋转角度 - :return: 旋转后的点坐标 - """ - matrix = cv2.getRotationMatrix2D(center, angle, 1) - if angle != 180: - # 旋转后平移 - matrix[0, 2] += center[1] - center[0] - matrix[1, 2] += center[0] - center[1] - - reverse_matrix = cv2.invertAffineTransform(matrix) - - point = numpy.array([[point[0]], [point[1]], [1]]) - return numpy.dot(reverse_matrix, point) - - -def invert_rotate_rectangle(rectangle, center, angle): - """ - 反向旋转图片上的矩形 - :param rectangle: 矩形 - :param center: 旋转中心 - :param angle: 旋转角度 - :return: 旋转后的矩形坐标 - """ - if angle == 0: - return list(rectangle) - - x1, y1, x2, y2 = rectangle - - # 计算矩形的四个顶点 - top_left = (x1, y1) - bot_left = (x1, y2) - top_right = (x2, y1) - bot_right = (x2, y2) - - # 旋转矩形的四个顶点 - rot_top_left = invert_rotate_point(top_left, center, angle).astype(int) - rot_bot_left = invert_rotate_point(bot_left, center, angle).astype(int) - rot_bot_right = invert_rotate_point(bot_right, center, angle).astype(int) - rot_top_right = invert_rotate_point(top_right, center, angle).astype(int) - - # 找出旋转后矩形的新左上角和右下角坐标 - new_top_left = (min(rot_top_left[0], rot_bot_left[0], rot_bot_right[0], rot_top_right[0]), - min(rot_top_left[1], rot_bot_left[1], rot_bot_right[1], rot_top_right[1])) - new_bot_right = (max(rot_top_left[0], rot_bot_left[0], rot_bot_right[0], rot_top_right[0]), - max(rot_top_left[1], rot_bot_left[1], rot_bot_right[1], rot_top_right[1])) - - return [new_top_left[0], new_top_left[1], new_bot_right[0], new_bot_right[1]] - - -def expand_to_a4_size(image, center=False): - """ - 将图片扩充到a4大小 - :param image: 图片NumPy数组 - :param center: 是否将原图置于中间 - :return: 扩充后的图片NumPy数组和偏移量 - """ - h, w = image.shape[:2] - offset_x, offset_y = 0, 0 - if h * 1.0 / w >= 1.42: - exp_w = int(h / 1.414 - w) - if center: - offset_x = int(exp_w / 2) - exp_img = numpy.zeros((h, offset_x, 3), dtype="uint8") - exp_img.fill(255) - image = numpy.hstack([exp_img, image, exp_img]) - else: - exp_img = numpy.zeros((h, exp_w, 3), dtype="uint8") - exp_img.fill(255) - image = numpy.hstack([image, exp_img]) - elif h * 1.0 / w <= 1.40: - exp_h = int(w * 1.414 - h) - if center: - offset_y = int(exp_h / 2) - exp_img = numpy.zeros((offset_y, w, 3), dtype="uint8") - exp_img.fill(255) - image = numpy.vstack([exp_img, image, exp_img]) - else: - exp_img = numpy.zeros((exp_h, w, 3), dtype="uint8") - exp_img.fill(255) - image = numpy.vstack([image, exp_img]) - return image, offset_x, offset_y diff --git a/util/util.py b/util/util.py index 540994e..6d1c2b2 100644 --- a/util/util.py +++ b/util/util.py @@ -1,78 +1,6 @@ -import logging -import os from datetime import datetime # 获取yyyy-MM-dd HH:mm:ss格式的当前时间 def get_default_datetime(): return datetime.now().strftime('%Y-%m-%d %H:%M:%S') - - -def get_ocr_layout(ocr, img_path): - """ - 获取ocr识别的结果,转为合适的layout形式 - :param ocr: ocr模型 - :param img_path: 图片本地路径 - :return: - """ - - def _get_box(old_box): - new_box = [ - min(old_box[0][0], old_box[3][0]), # x1 - min(old_box[0][1], old_box[1][1]), # y1 - max(old_box[1][0], old_box[2][0]), # x2 - max(old_box[2][1], old_box[3][1]), # y2 - ] - return new_box - - def _normal_box(box_data): - # Ensure the height and width of bbox are greater than zero - if box_data[3] - box_data[1] < 0 or box_data[2] - box_data[0] < 0: - return False - return True - - layout = [] - ocr_result = ocr.ocr(img_path, cls=False) - ocr_result = ocr_result[0] - if not ocr_result: - return layout - for segment in ocr_result: - box = segment[0] - box = _get_box(box) - if not _normal_box(box): - continue - text = segment[1][0] - layout.append((box, text)) - return layout - - -def delete_temp_file(temp_files): - """ - 删除临时文件,可以批量 - :param temp_files: 临时文件路径 - """ - if not temp_files: - return - if isinstance(temp_files, str): - temp_files = [temp_files] - for file in temp_files: - try: - os.remove(file) - logging.info(f"临时文件 {file} 已删除") - except Exception as e: - logging.warning(f"删除临时文件 {file} 时出错: {e}") - - -def zoom_rectangle(rectangle, ratio): - """ - 缩放矩形 - :param rectangle: 原矩形坐标 - :param ratio: 缩放比率 - :return: 缩放后的矩形坐标 - """ - x1, y1, x2, y2 = rectangle - x1 = round(x1 - x1 * ratio) - y1 = round(y1 - y1 * ratio) - x2 = round(x2 + x2 * ratio) - y2 = round(y2 + y2 * ratio) - return [x1, y1, x2, y2]