Files
fcb_photo_review/ucloud/ufile.py
2024-07-25 13:46:25 +08:00

62 lines
2.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# https://github.com/ucloud/ufile-sdk-python
import logging
from tenacity import retry, stop_after_attempt, wait_random, retry_if_exception_type, retry_if_result
from ufile import filemanager
from ucloud import PUBLIC_KEY, PRIVATE_KEY, UPLOAD_SUFFIX, DOWNLOAD_SUFFIX, BUCKET, PRIVATE_EXPIRES, TRY_TIMES, \
MIN_WAIT_TIME, MAX_WAIT_TIME
UFILE_HANDLER = filemanager.FileManager(PUBLIC_KEY, PRIVATE_KEY, UPLOAD_SUFFIX, DOWNLOAD_SUFFIX)
UCLOUD_LOGGER = logging.getLogger('ucloud')
@retry(stop=stop_after_attempt(TRY_TIMES), wait=wait_random(MIN_WAIT_TIME, MAX_WAIT_TIME),
retry=retry_if_exception_type(ConnectionError), reraise=True)
def get_private_url(key, bucket=BUCKET):
# 判断文件是否存在
_, resp = UFILE_HANDLER.head_file(bucket, key)
if resp.status_code == -1:
UCLOUD_LOGGER.warning(f"查询{key}时uCloud连接失败!")
raise ConnectionError("uCloud连接失败")
if resp.status_code != 200:
UCLOUD_LOGGER.warning(f"[{bucket}]中未找到({key})! status: {resp.status_code} error: {resp.error}")
return None
# 获取公有空间下载url
# url = get_ufile_handler.public_download_url(bucket, key)
# 获取私有空间下载url, expires为下载链接有效期单位为秒
url = UFILE_HANDLER.private_download_url(bucket, key, expires=PRIVATE_EXPIRES)
return url
@retry(stop=stop_after_attempt(TRY_TIMES), wait=wait_random(MIN_WAIT_TIME, MAX_WAIT_TIME),
retry=retry_if_result(lambda x: x is False))
def copy_file(source_bucket, source_key, target_bucket, target_key):
# 复制文件
_, resp = UFILE_HANDLER.copy(target_bucket, target_key, source_bucket, source_key)
if resp.status_code == -1:
UCLOUD_LOGGER.warning(f"复制{source_key}时uCloud连接失败!")
return False
if resp.status_code != 200:
UCLOUD_LOGGER.warning(
f"将({source_key})从[{source_bucket}]拷贝到[{target_bucket}]失败! status: {resp.status_code} error: {resp.error}"
)
return False
return True
@retry(stop=stop_after_attempt(TRY_TIMES), wait=wait_random(MIN_WAIT_TIME, MAX_WAIT_TIME),
retry=retry_if_result(lambda x: x is False))
def upload_file(key, file_path, bucket=BUCKET):
# 普通上传文件至云空间
_, resp = UFILE_HANDLER.putfile(bucket, key, file_path, header=None)
if resp.status_code == -1:
UCLOUD_LOGGER.warning(f"上传{key}时uCloud连接失败!")
return False
if resp.status_code != 200:
UCLOUD_LOGGER.warning(f"上传({key})失败! status: {resp.status_code} error: {resp.error}")
return False
return True