import logging import math import urllib.request from io import BytesIO import cv2 import numpy from PIL import Image from PIL.ExifTags import TAGS from paddleclas import PaddleClas from tenacity import retry, stop_after_attempt, wait_random @retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True, after=lambda x: logging.warning("获取图片失败!")) def read(image_path): """ 从网络或本地读取图片 :param image_path: 网络或本地路径 :return: NumPy数组形式的图片, EXIF数据 """ if image_path.startswith("http"): # 发送HTTP请求并获取图像数据 resp = urllib.request.urlopen(image_path, timeout=60) # 将数据读取为字节流 image_data = resp.read() else: with open(image_path, "rb") as f: image_data = f.read() # 解析EXIF信息(基于原始字节流) exif_data = {} try: # 用PIL打开原始字节流 with Image.open(BytesIO(image_data)) as img: # 获取EXIF字典 exif_info = img._getexif() if exif_info: # 将EXIF标签的数字ID转换为可读名称(如36867对应"DateTimeOriginal") for tag_id, value in exif_info.items(): tag_name = TAGS.get(tag_id, tag_id) exif_data[tag_name] = value except Exception as e: logging.error("解析EXIF信息失败", exc_info=e) # 将字节流转换为NumPy数组 image_np = numpy.frombuffer(image_data, numpy.uint8) # 解码NumPy数组为OpenCV图像格式 image = cv2.imdecode(image_np, cv2.IMREAD_COLOR) return image, exif_data def capture(image, rectangle): """ 截取图片 :param image: 图片NumPy数组 :param rectangle: 要截取的矩形 :return: 截取之后的图片NumPy """ x1, y1, x2, y2 = rectangle height, width = image.shape[:2] if x1 < 0: x1 = 0 if y1 < 0: y1 = 0 if x2 > width: x2 = width if y2 > height: y2 = height return image[int(y1):int(y2), int(x1):int(x2)] def split(image, ratio=1.414, overlap=0.05, x_compensation=3): """ 分割图片 :param image:图片,可以是NumPy数组或文件路径 :param ratio: 分割后的比例 :param overlap: 图片之间的覆盖比例 :param x_compensation: 横向补偿倍率 :return: 分割后的图片组(NumPy数组形式) """ split_result = [] if isinstance(image, str): image, _ = read(image) height, width = image.shape[:2] hw_ratio = height / width wh_ratio = width / height if hw_ratio > ratio: # 纵向过长 new_img_height = width * ratio step = width * (ratio - overlap) # 偏移步长 for i in range(math.ceil(height / step)): offset = round(step * i) cropped_img = capture(image, [0, offset, width, offset + new_img_height]) if cropped_img.shape[0] > 0: # 计算误差可能导致图片高度为0,此时不添加 split_result.append({"img": cropped_img, "x_offset": 0, "y_offset": offset}) elif wh_ratio > ratio: # 横向过长 new_img_width = height * ratio step = height * (ratio - overlap * x_compensation) # 一般文字是横向的,所以横向截取时增大重叠部分 for i in range(math.ceil(width / step)): offset = round(step * i) cropped_img = capture(image, [offset, 0, offset + new_img_width, width]) if cropped_img.shape[1] > 0: # 计算误差可能导致图片宽度为0,此时不添加 split_result.append({"img": cropped_img, "x_offset": offset, "y_offset": 0}) else: split_result.append({"img": image, "x_offset": 0, "y_offset": 0}) return split_result def parse_rotation_angles(image): """ 判断图片旋转角度,逆时针旋转该角度后为正。可能值["0", "90", "180", "270"] :param image: 图片NumPy数组或文件路径 :return: 最有可能的两个角度 """ angles = ['0', '90'] model = PaddleClas(model_name="text_image_orientation") clas_result = model.predict(input_data=image) try: clas_result = next(clas_result)[0] if clas_result["scores"][0] < 0.5: return angles angles = clas_result["label_names"] except Exception as e: logging.error("获取图片旋转角度失败", exc_info=e) return angles def rotate(image, angle): """ 旋转图片 :param image: 图片NumPy数组 :param angle: 逆时针旋转角度 :return: 旋转后的图片NumPy数组 """ if angle == 0: return image height, width = image.shape[:2] if angle == 180: new_width = width new_height = height else: new_width = height new_height = width # 绕图像的中心旋转 # 参数:旋转中心 旋转度数 scale matrix = cv2.getRotationMatrix2D((width / 2, height / 2), angle, 1) # 旋转后平移 matrix[0, 2] += (new_width - width) / 2 matrix[1, 2] += (new_height - height) / 2 # 参数:原始图像 旋转参数 元素图像宽高 rotated = cv2.warpAffine(image, matrix, (new_width, new_height)) return rotated def invert_rotate_point(point, center, angle): """ 反向旋转图片上的点 :param point: 点 :param center: 旋转中心 :param angle: 旋转角度 :return: 旋转后的点坐标 """ matrix = cv2.getRotationMatrix2D(center, angle, 1) if angle != 180: # 旋转后平移 matrix[0, 2] += center[1] - center[0] matrix[1, 2] += center[0] - center[1] reverse_matrix = cv2.invertAffineTransform(matrix) point = numpy.array([[point[0]], [point[1]], [1]]) return numpy.dot(reverse_matrix, point) def invert_rotate_rectangle(rectangle, center, angle): """ 反向旋转图片上的矩形 :param rectangle: 矩形 :param center: 旋转中心 :param angle: 旋转角度 :return: 旋转后的矩形坐标 """ if angle == 0: return list(rectangle) x1, y1, x2, y2 = rectangle # 计算矩形的四个顶点 top_left = (x1, y1) bot_left = (x1, y2) top_right = (x2, y1) bot_right = (x2, y2) # 旋转矩形的四个顶点 rot_top_left = invert_rotate_point(top_left, center, angle).astype(int) rot_bot_left = invert_rotate_point(bot_left, center, angle).astype(int) rot_bot_right = invert_rotate_point(bot_right, center, angle).astype(int) rot_top_right = invert_rotate_point(top_right, center, angle).astype(int) # 找出旋转后矩形的新左上角和右下角坐标 new_top_left = (min(rot_top_left[0], rot_bot_left[0], rot_bot_right[0], rot_top_right[0]), min(rot_top_left[1], rot_bot_left[1], rot_bot_right[1], rot_top_right[1])) new_bot_right = (max(rot_top_left[0], rot_bot_left[0], rot_bot_right[0], rot_top_right[0]), max(rot_top_left[1], rot_bot_left[1], rot_bot_right[1], rot_top_right[1])) return [new_top_left[0], new_top_left[1], new_bot_right[0], new_bot_right[1]] def expand_to_a4_size(image): """ 以尽量少的方式将图片扩充到a4大小 :param image: 图片NumPy数组 :return: 扩充后的图片NumPy数组和偏移量 """ height, width = image.shape[:2] x_offset, y_offset = 0, 0 hw_ratio = height / width if hw_ratio >= 1.42: exp_w = int(height / 1.414 - width) x_offset = int(exp_w / 2) exp_img = numpy.zeros((height, x_offset, 3), dtype="uint8") exp_img.fill(255) image = numpy.hstack([exp_img, image, exp_img]) elif 1 <= hw_ratio <= 1.40: exp_h = int(width * 1.414 - height) y_offset = int(exp_h / 2) exp_img = numpy.zeros((y_offset, width, 3), dtype="uint8") exp_img.fill(255) image = numpy.vstack([exp_img, image, exp_img]) elif 0.72 <= hw_ratio < 1: exp_w = int(height * 1.414 - width) x_offset = int(exp_w / 2) exp_img = numpy.zeros((height, x_offset, 3), dtype="uint8") exp_img.fill(255) image = numpy.hstack([exp_img, image, exp_img]) elif hw_ratio <= 0.7: exp_h = int(width / 1.414 - height) y_offset = int(exp_h / 2) exp_img = numpy.zeros((y_offset, width, 3), dtype="uint8") exp_img.fill(255) image = numpy.vstack([exp_img, image, exp_img]) return image, x_offset, y_offset def combined(img1, img2): # 获取两张图片的高度和宽度 height1, width1 = img1.shape[:2] height2, width2 = img2.shape[:2] # 确保两张图片的高度相同 if height1 != height2: # 如果高度不同,调整较小高度的图片 if height1 < height2: img1 = cv2.resize(img1, (int(width1 * height2 / height1), height2)) else: img2 = cv2.resize(img2, (int(width2 * height1 / height2), height1)) # 再次获取调整后的图片尺寸 height1, width1 = img1.shape[:2] height2, width2 = img2.shape[:2] # 创建一个空白的图像,宽度等于两张图片的宽度之和,高度等于它们共同的高度 total_width = width1 + width2 max_height = max(height1, height2) combined_img = numpy.zeros((max_height, total_width, 3), dtype=numpy.uint8) # 将img1和img2复制到新的图像中 combined_img[:height1, :width1] = img1 combined_img[:height2, width1:width1 + width2] = img2 return combined_img def parse_clarity(image): """ 判断图片清晰度 :param image: 图片NumPy数组或文件路径 :return: 判断结果及置信度 """ clarity_result = [1, 0] model = PaddleClas(inference_model_dir=r"model/clas/clarity_assessment", use_gpu=True) clas_result = model.predict(input_data=image) try: clas_result = next(clas_result)[0] clarity_result = [clas_result["class_ids"][0], clas_result["scores"][0]] except Exception as e: logging.error("获取图片清晰度失败", exc_info=e) return clarity_result def is_photo_by_exif(exif_tags): """分析EXIF数据判断是否为照片""" # 照片通常包含的EXIF标签 photo_tags = [ 'FNumber', # 光圈 'ExposureTime', # 曝光时间 'ISOSpeedRatings', # ISO 'FocalLength', # 焦距 'LensModel', # 镜头型号 'GPSLatitude' # GPS位置信息 ] # 统计照片相关的EXIF标签数量 photo_tag_count = 0 if exif_tags: for tag in photo_tags: if tag in exif_tags: photo_tag_count += 1 # 如果有2个以上照片相关的EXIF标签,倾向于是照片 if photo_tag_count >= 2: return True # 不确定是照片返回False return False def is_screenshot_by_image_features(image): """分析图像特征判断是否为截图""" # 定义边缘像素标准差阈值,小于此阈值则认为图片是截图 edge_std_threshold = 20.0 try: # 检查边缘像素的一致性(截图边缘通常更整齐) edge_pixels = [] # 取图像边缘10像素 edge_pixels.extend(image[:10, :].flatten()) # 顶部边缘 edge_pixels.extend(image[-10:, :].flatten()) # 底部边缘 edge_pixels.extend(image[:, :10].flatten()) # 左侧边缘 edge_pixels.extend(image[:, -10:].flatten()) # 右侧边缘 # 计算边缘像素的标准差(值越小说明越一致) edge_std = numpy.std(edge_pixels) logging.info(f"边缘像素标准差: {edge_std}") return edge_std < edge_std_threshold except Exception as e: logging.error("图像特征分析失败", exc_info=e) return False def is_screenshot(image, exif_tags): """综合判断是否是截图""" # 先检查EXIF数据 result_of_exif = is_photo_by_exif(exif_tags) # 如果有明显的照片EXIF信息,直接判断为照片 if result_of_exif: return False # 分析图像特征 return is_screenshot_by_image_features(image)