Files
fcb_photo_review/util/model_util.py

210 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
import os.path
import requests
from tenacity import retry, stop_after_attempt, wait_random
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
after=lambda x: logging.warning('OCR识别失败'))
def ocr(img_path):
"""
请求图片OCR识别接口
:param img_path: 待识别图片路径
:return: 识别结果
"""
url = 'http://ocr:5001'
response = requests.post(url, {'img_path': img_path})
if response.status_code == 200:
ocr_result = response.json()
if ocr_result:
return ocr_result[0]
return None
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
after=lambda x: logging.warning('抽取基本医保结算单失败!'))
def ie_settlement(img_path, layout):
"""
请求基本医保结算单信息抽取接口
:param img_path: 待抽取图片路径
:param layout: 图片ocr信息
:return: 抽取结果
"""
url = 'http://ie_settlement:5002'
response = requests.post(url, {'img_path': img_path, 'layout': json.dumps(layout)})
if response.status_code == 200:
ie_result = response.json()
if ie_result:
return ie_result[0]
return None
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
after=lambda x: logging.warning('从文本抽取基本医保结算单失败!'))
def ie_settlement_text(text):
"""
请求基本医保结算单信息抽取接口
:param text: 待抽取文本
:return: 抽取结果
"""
url = 'http://ie_settlement:5002/text'
response = requests.post(url, {'text': text})
if response.status_code == 200:
ie_result = response.json()
if ie_result:
return ie_result[0]
return None
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
after=lambda x: logging.warning('抽取出院记录失败!'))
def ie_discharge(img_path, layout):
"""
请求出院记录信息抽取接口
:param img_path: 待抽取图片路径
:param layout: 图片ocr信息
:return: 抽取结果
"""
url = 'http://ie_discharge:5003'
response = requests.post(url, {'img_path': img_path, 'layout': json.dumps(layout)})
if response.status_code == 200:
ie_result = response.json()
if ie_result:
return ie_result[0]
return None
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
after=lambda x: logging.warning('从文本抽取出院记录失败!'))
def ie_discharge_text(text):
"""
请求出院记录信息抽取接口
:param text: 待抽取文本
:return: 抽取结果
"""
url = 'http://ie_discharge:5003/text'
response = requests.post(url, {'text': text})
if response.status_code == 200:
ie_result = response.json()
if ie_result:
return ie_result[0]
return None
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
after=lambda x: logging.warning('抽取费用清单失败!'))
def ie_cost(img_path, layout):
"""
请求费用清单信息抽取接口
:param img_path: 待抽取图片路径
:param layout: 图片ocr信息
:return: 抽取结果
"""
url = 'http://ie_cost:5004'
response = requests.post(url, {'img_path': img_path, 'layout': json.dumps(layout)})
if response.status_code == 200:
ie_result = response.json()
if ie_result:
return ie_result[0]
return None
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
after=lambda x: logging.warning('从文本抽取费用清单失败!'))
def ie_cost_text(text):
"""
请求费用清单信息抽取接口
:param text: 待抽取文本
:return: 抽取结果
"""
url = 'http://ie_cost:5004/text'
response = requests.post(url, {'text': text})
if response.status_code == 200:
ie_result = response.json()
if ie_result:
return ie_result[0]
return None
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
after=lambda x: logging.warning('获取图片方向失败!'))
def clas_orientation(img_path):
"""
请求图片方向分类接口
:param img_path: 待分类图片路径
:return: 最有可能的两个图片方向
"""
url = 'http://clas_orientation:5005'
response = requests.post(url, {'img_path': img_path})
if response.status_code == 200:
return response.json()
else:
return ['0', '90']
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
after=lambda x: logging.warning('获取文档区域失败!'))
def det_book(img_path):
"""
请求文档区域识别接口
:param img_path: 待识别图片路径
:return: 文档图片路径列表
"""
url = 'http://det_book:5006'
response = requests.post(url, {'img_path': img_path})
if response.status_code == 200:
book_path_list = response.json()
if len(book_path_list) == 0:
return img_path
elif len(book_path_list) == 1:
return book_path_list[0]
else:
max_book = img_path
max_size = 0
for book_path in book_path_list:
book_size = os.path.getsize(book_path)
if book_size > max_size:
max_book = book_path
max_size = book_size
return max_book
else:
return img_path
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
after=lambda x: logging.warning('矫正扭曲图片失败!'))
def dewarp(img_path):
"""
请求矫正图片接口
:param img_path: 待矫正图片路径
:return: 矫正后的图片路径
"""
url = 'http://dewarp:5007'
response = requests.post(url, {'img_path': img_path})
if response.status_code == 200:
return response.json()
else:
return img_path
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
after=lambda x: logging.warning('文本分类失败!'))
def clas_text(text):
"""
请求文本分类接口
:param text: 待分类文本
:return: 分类结果
"""
if not text:
return None
url = 'http://clas_text:5008'
if len(text) > 2048:
text = text[:2048]
response = requests.post(url, {'text': text})
if response.status_code == 200:
return response.json()
else:
return None