87 lines
3.4 KiB
Python
87 lines
3.4 KiB
Python
import logging
|
||
import re
|
||
import tempfile
|
||
from urllib.parse import parse_qs, urlparse
|
||
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
from tenacity import retry, stop_after_attempt, wait_random
|
||
|
||
|
||
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
|
||
after=lambda x: logging.warning('获取江苏省财政票据idBase失败!'))
|
||
def get_jsczt_id_base(url):
|
||
response = requests.get(url)
|
||
if response.status_code != 200:
|
||
raise Exception(f'请求江苏省财政票据失败!状态码: {response.status_code}')
|
||
soup = BeautifulSoup(response.text, 'html.parser')
|
||
hidden_input = soup.find('input', {'name': 'idBase'})
|
||
if hidden_input:
|
||
# 获取隐藏字段的值
|
||
value = hidden_input.get('value')
|
||
return value
|
||
return None
|
||
|
||
|
||
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
|
||
after=lambda x: logging.warning('下载pdf失败!'))
|
||
def download_pdf(url, local_filename=None):
|
||
# 如果没有提供文件名,则使用URL中的文件名
|
||
if local_filename is None:
|
||
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_pdf:
|
||
local_filename = temp_pdf.name
|
||
# 发送HTTP GET请求
|
||
response = requests.get(url, stream=True)
|
||
# 检查请求是否成功
|
||
if response.status_code != 200:
|
||
raise Exception(f'下载pdf失败!状态码: {response.status_code}')
|
||
else:
|
||
# 打开一个文件用于写入二进制数据
|
||
with open(local_filename, 'wb') as file:
|
||
# 迭代写入文件
|
||
for chunk in response.iter_content(chunk_size=8192):
|
||
if chunk: # filter out keep-alive new chunks
|
||
file.write(chunk)
|
||
return local_filename
|
||
|
||
|
||
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
|
||
after=lambda x: logging.warning('获取无锡医院票据失败!'))
|
||
def get_wx_pdf_url(url):
|
||
response = requests.get(url)
|
||
if response.status_code != 200:
|
||
raise Exception(f'请求无锡医院票据失败!状态码: {response.status_code}')
|
||
soup = BeautifulSoup(response.text, 'html.parser')
|
||
pdf_url = soup.find('a', string='点击查看电子票据')
|
||
if pdf_url:
|
||
# 获取隐藏字段的值
|
||
value = pdf_url.get('href')
|
||
return value
|
||
return None
|
||
|
||
|
||
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 3), reraise=True,
|
||
after=lambda x: logging.warning('获取泰州三院电子发票失败!'))
|
||
def get_tz3y_pdf_url(url):
|
||
response = requests.get(url)
|
||
if response.status_code != 200:
|
||
raise Exception(f'请求泰州三院电子发票失败!状态码: {response.status_code}')
|
||
soup = BeautifulSoup(response.text, 'html.parser')
|
||
script_tag = soup.find('script', {'src': None})
|
||
if script_tag:
|
||
url_match = re.search(r'var url="(.*?)"\+fphm;', script_tag.string)
|
||
if url_match:
|
||
request_pdf_url = url_match.group(1)
|
||
query = urlparse(url).query
|
||
query_params = parse_qs(query)
|
||
fphm = query_params.get('fphm')[0]
|
||
request_pdf_url += fphm
|
||
response = requests.get(request_pdf_url)
|
||
if response.status_code != 200:
|
||
raise Exception(f'请求泰州三院电子发票失败!状态码: {response.status_code}')
|
||
pdf_match = re.search(r"'dzfpUrl':'(.*)'", response.text)
|
||
if pdf_match:
|
||
return pdf_match.group(1)
|
||
|
||
return None
|