import logging import os from datetime import datetime import requests from opencc import OpenCC from tenacity import retry, stop_after_attempt, wait_random from log import PROJECT_ROOT from photo_review import get_batch_id from util import string_util, model_util # 获取yyyy-MM-dd HH:mm:ss格式的当前时间 def get_default_datetime(): return datetime.now().strftime('%Y-%m-%d %H:%M:%S') def ocr_result_to_layout(ocr_result): def _get_box(old_box): new_box = [ min(old_box[0][0], old_box[3][0]), # x1 min(old_box[0][1], old_box[1][1]), # y1 max(old_box[1][0], old_box[2][0]), # x2 max(old_box[2][1], old_box[3][1]), # y2 ] return new_box def _normal_box(box_data): # Ensure the height and width of bbox are greater than zero if box_data[3] - box_data[1] < 0 or box_data[2] - box_data[0] < 0: return False return True layout = [] if not ocr_result: return layout for segment in ocr_result: box = segment[0] box = _get_box(box) if not _normal_box(box): continue text = segment[1][0] layout.append((box, text)) return layout def ocr_result_to_text(ocr_results): text = '' for ocr_result in ocr_results: text += ocr_result[1][0] if len(text) >= 2048: break return text[:2048] def get_ocr_layout(ocr, img_path): """ 获取ocr识别的结果,转为合适的layout形式 :param ocr: ocr模型 :param img_path: 图片本地路径 :return: """ def _get_box(old_box): new_box = [ min(old_box[0][0], old_box[3][0]), # x1 min(old_box[0][1], old_box[1][1]), # y1 max(old_box[1][0], old_box[2][0]), # x2 max(old_box[2][1], old_box[3][1]), # y2 ] return new_box def _normal_box(box_data): # Ensure the height and width of bbox are greater than zero if box_data[3] - box_data[1] < 0 or box_data[2] - box_data[0] < 0: return False return True layout = [] ocr_result = model_util.ocr(img_path) ocr_result = ocr_result[0] if not ocr_result: return layout for segment in ocr_result: box = segment[0] box = _get_box(box) if not _normal_box(box): continue text = segment[1][0] layout.append((box, text)) return layout def delete_temp_file(temp_files): """ 删除临时文件,可以批量 :param temp_files: 临时文件路径 """ if not temp_files: return if isinstance(temp_files, str): temp_files = [temp_files] for file in temp_files: try: os.remove(file) logging.info(f'临时文件 {file} 已删除') except Exception as e: logging.warning(f'删除临时文件 {file} 时出错: {e}') def zoom_rectangle(rectangle, ratio): """ 缩放矩形 :param rectangle: 原矩形坐标 :param ratio: 缩放比率 :return: 缩放后的矩形坐标 """ x1, y1, x2, y2 = rectangle x1 = round(x1 - x1 * ratio) y1 = round(y1 - y1 * ratio) x2 = round(x2 + x2 * ratio) y2 = round(y2 + y2 * ratio) return [x1, y1, x2, y2] def chinese_to_money_unit(chinese): if chinese in ['拾', '十']: return 10, False elif chinese in ['佰', '百']: return 100, False elif chinese in ['仟', '千']: return 1000, False elif chinese == '万': return 10000, True elif chinese == '亿': return 100000000, True else: return None, False def chinese_char_to_number(chinese): if chinese == '零': return 0 elif chinese in ['一', '壹']: return 1 elif chinese in ['二', '贰']: return 2 elif chinese in ['三', '叁']: return 3 elif chinese in ['四', '肆']: return 4 elif chinese in ['五', '伍']: return 5 elif chinese in ['六', '陆']: return 6 elif chinese in ['七', '柒']: return 7 elif chinese in ['八', '捌']: return 8 elif chinese in ['九', '玖']: return 9 else: return -1 def chinese_to_number(chinese): length = len(chinese) result = 0 section = 0 number = 0 unit = [None, False] for i in range(length): c = chinese[i] num = chinese_char_to_number(c) if num >= 0: if num == 0: if number > 0 and unit[0] is not None: section += number * (unit[0] / 10) unit = [None, False] elif number > 0: raise ValueError(f"'{chinese} has bad number '{chinese[i - 1]}{c}' at: {i}'") number = num else: unit = chinese_to_money_unit(c) if unit[0] is None: raise ValueError(f"'{chinese} has unknown unit '{c}' at: {i}'") if unit[1]: section = (section + number) * unit[0] result += section section = 0 else: unit_number = number if number == 0 and i == 0: unit_number = 1 section += unit_number * unit[0] number = 0 if number > 0 and unit[0] is not None: number *= unit[0] / 10 return result + section + number def chinese_money_to_number(chinese_money_amount): if string_util.blank(chinese_money_amount): return None yi = chinese_money_amount.find('元') if yi == -1: yi = chinese_money_amount.find('圆') ji = chinese_money_amount.find('角') fi = chinese_money_amount.find('分') if yi == -1 and ji == -1 and fi == -1: raise ValueError(f'无法解析: {chinese_money_amount}') y_str = None if yi > 0: y_str = chinese_money_amount[0:yi] j_str = None if ji > 0: if yi >= 0: if ji > yi: j_str = chinese_money_amount[yi + 1:ji] else: j_str = chinese_money_amount[0:ji] f_str = None if fi > 0: if ji >= 0: if fi > ji: f_str = chinese_money_amount[ji + 1:fi] elif yi > 0: if fi > yi: f_str = chinese_money_amount[yi + 1:fi] else: f_str = chinese_money_amount[0: fi] y = 0 j = 0 f = 0 if not string_util.blank(y_str): y = chinese_to_number(y_str) if not string_util.blank(j_str): j = chinese_to_number(j_str) if not string_util.blank(f_str): f = chinese_to_number(f_str) amount = y amount += j / 10 amount += f / 100 return round(amount, 2) # 将繁体字转换为简体字 def traditional_to_simple_chinese(traditional_chinese): converter = OpenCC('t2s') return converter.convert(traditional_chinese) def parse_img_url(url): """ 解析图片url :param url: 图片url :return: 图片名称和图片后缀 """ url = url.split('?')[0] return os.path.basename(url) @retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True, after=lambda x: logging.warning('保存图片失败!')) def save_to_local(img_url): """ 保存图片到本地 :param img_url: 图片url :return: 本地保存地址 """ response = requests.get(img_url) response.raise_for_status() # 检查响应状态码是否正常 save_path = get_tmp_img_path(parse_img_url(img_url)) with open(save_path, 'wb') as file: file.write(response.content) return save_path def get_img_path(img_full_name): save_path = get_tmp_img_path(img_full_name) if os.path.exists(save_path): return save_path return None def get_tmp_img_path(img_full_name): return os.path.join(PROJECT_ROOT, 'tmp_img', img_full_name) def get_processed_img_path(img_full_name): return os.path.join(str(get_tmp_img_path(get_batch_id())), img_full_name) def parse_save_path(img_path): img_full_name = os.path.basename(img_path) img_name, img_ext = img_full_name.rsplit('.', 1) return img_name, img_ext