# https://github.com/ucloud/ufile-sdk-python import logging from time import sleep from ufile import filemanager from ucloud import PUBLIC_KEY, PRIVATE_KEY, UPLOAD_SUFFIX, DOWNLOAD_SUFFIX, BUCKET, PRIVATE_EXPIRES UFILE_HANDLER = filemanager.FileManager(PUBLIC_KEY, PRIVATE_KEY, UPLOAD_SUFFIX, DOWNLOAD_SUFFIX) def get_private_url(key, bucket=BUCKET): for i in range(3): # 判断文件是否存在 _, resp = UFILE_HANDLER.head_file(bucket, key) if resp.status_code == -1: logging.warning(f"uCloud连接失败!即将重试...") sleep(1) continue if resp.status_code != 200: logging.warning(f"uCloud中未找到({key})! status: {resp.status_code} error: {resp.error}") return None # 获取公有空间下载url # url = get_ufile_handler.public_download_url(bucket, key) # 获取私有空间下载url, expires为下载链接有效期,单位为秒 url = UFILE_HANDLER.private_download_url(bucket, key, expires=PRIVATE_EXPIRES) return url def copy_file(source_bucket, source_key, target_bucket, target_key): for i in range(3): # 拷贝文件 ret, resp = UFILE_HANDLER.copy(target_bucket, target_key, source_bucket, source_key) if resp.status_code == -1: logging.warning(f"uCloud连接失败!即将重试...") sleep(1) continue if resp.status_code != 200: logging.warning( f"将({source_key})从({source_bucket})拷贝到({target_bucket})失败! status: {resp.status_code} error: {resp.error}") return False return True def upload_file(key, file_path, bucket=BUCKET): for i in range(3): # 普通上传文件至空间 ret, resp = UFILE_HANDLER.putfile(bucket, key, file_path, header=None) if resp.status_code == -1: logging.warning(f"uCloud连接失败!即将重试...") sleep(1) continue if resp.status_code != 200: logging.warning(f"上传({key})失败! status: {resp.status_code} error: {resp.error}") return False return True