Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""断点续传上传
Args:
path (TYPE): file abspath
Returns:
TYPE: Description
"""
part_size = os.path.getsize(path) if os.path.getsize(
path) < 1024 * 1024 else os.path.getsize(path) // 10
success = False
retry = 10
while not success and retry>0:
retry -= 1
try:
oss2.resumable_upload(self.bucket, path.rsplit(os.sep, 1)[1], path, progress_callback=self.percentage,
# store=oss2.ResumableStore(root='/tmp'),
store=oss2.ResumableStore(root='/tmp' if checkOS()=='linux' else config.BASE_DIR),
multipart_threshold=1024 * 1024,
part_size=part_size,
num_threads=4)
success = True
return True
except oss2.exceptions.RequestError as e:
log.warn('上传失败,即将进行重试')
time.sleep(2)
continue
return False
multipart_threshold=(200*1024), num_threads=2, part_size=(100*1024))
# Resumale upload small object with payer setting, should be successful.
headers = dict()
headers[OSS_REQUEST_PAYER] = "requester"
result = oss2.resumable_upload(self.payer_bucket, small_object, file_name,
multipart_threshold=(200*1024), num_threads=2, part_size=(100*1024), headers=headers)
self.assertEqual(result.status, 200)
self.bucket.delete_object(small_object)
# Start big file test
# Create big file bigger than multipart_threshold
file_name = self._prepare_temp_file_with_size(11 *1024 * 1024)
# Resumale upload big object without payer setting, should be failed.
self.assertRaises(oss2.exceptions.AccessDenied, oss2.resumable_upload, self.payer_bucket, big_object, file_name,
multipart_threshold=(200*1024), num_threads=2, part_size=(100*1024))
# Resumale upload big object with payer setting and tagging setting, should be successful.
key1 = 'key1'
value1 = 'value2'
key2 = 'key2'
value2 = 'value2'
tag_str = key1 + '=' + value1
tag_str += '&' + key2 + '=' + value2
headers = dict()
headers[OSS_REQUEST_PAYER] = "requester"
headers[OSS_OBJECT_TAGGING] = tag_str
result = oss2.resumable_upload(self.payer_bucket, big_object, file_name,
upload_id = bucket.init_multipart_upload(key).upload_id
for part_number in uploaded_parts:
start = (part_number - 1) * part_size
if part_number == num_parts:
end = content_size
else:
end = start + part_size
if encryption_flag:
bucket.upload_part(key, upload_id, part_number, content[start:end], upload_context=context)
else:
bucket.upload_part(key, upload_id, part_number, content[start:end])
self._rebuild_record(pathname, oss2.resumable.make_upload_store(), bucket, key, upload_id, part_size, context)
oss2.resumable_upload(bucket, key, pathname, multipart_threshold=0, part_size=100 * 1024)
result = bucket.get_object(key)
self.assertEqual(content, result.read())
self.assertEqual(len(list(oss2.ObjectUploadIterator(self.bucket, key))), expected_unfinished)
bucket.delete_object(key)
pathname = self._prepare_temp_file(content)
if encryption_flag:
with patch.object(oss2.CryptoBucket, 'upload_part', side_effect=upload_part,
autospec=True) as mock_upload_part:
self.assertRaises(RuntimeError, oss2.resumable_upload, bucket, key, pathname, multipart_threshold=0,
part_size=100 * 1024)
else:
with patch.object(oss2.Bucket, 'upload_part', side_effect=upload_part, autospec=True) as mock_upload_part:
self.assertRaises(RuntimeError, oss2.resumable_upload, bucket, key, pathname, multipart_threshold=0,
part_size=100 * 1024)
if modify_record_func:
modify_record_func(oss2.resumable.make_upload_store(), bucket.bucket_name, key, pathname)
oss2.resumable_upload(bucket, key, pathname, multipart_threshold=0, part_size=100 * 1024)
self.assertEqual(len(list(oss2.ObjectUploadIterator(self.bucket, key))), expected_unfinished)
key = random_string(16)
content = random_bytes(5 * 100 * 1024 + 100)
pathname = self._prepare_temp_file(content)
part_size = 100 * 1024
oss2.resumable_upload(bucket, key, pathname,
multipart_threshold=200 * 1024,
part_size=part_size,
progress_callback=progress_callback,
num_threads=1)
self.assertEqual(stats['previous'], len(content))
self.assertEqual(stats['ncalled'], oss2.utils.how_many(len(content), part_size) + 1)
stats = {'previous': -1, 'ncalled': 0}
oss2.resumable_upload(bucket, key, pathname,
multipart_threshold=len(content) + 100,
progress_callback=progress_callback)
self.assertEqual(stats['previous'], len(content))
bucket.delete_object(key)
def test_resumable_upload(self):
small_object = 'requestpayment-test-resumable-upload-small-object'
big_object = 'requestpayment-test-resumable-upload-big-object'
# Create tmp file smaller than multipart_threshold
file_name = self._prepare_temp_file_with_size(150 * 1024)
# Resumale upload small object without payer setting, should be failed.
self.assertRaises(oss2.exceptions.AccessDenied, oss2.resumable_upload, self.payer_bucket, small_object, file_name,
multipart_threshold=(200*1024), num_threads=2, part_size=(100*1024))
# Resumale upload small object with payer setting, should be successful.
headers = dict()
headers[OSS_REQUEST_PAYER] = "requester"
result = oss2.resumable_upload(self.payer_bucket, small_object, file_name,
multipart_threshold=(200*1024), num_threads=2, part_size=(100*1024), headers=headers)
self.assertEqual(result.status, 200)
self.bucket.delete_object(small_object)
# Start big file test
# Create big file bigger than multipart_threshold
file_name = self._prepare_temp_file_with_size(11 *1024 * 1024)
# Resumale upload big object without payer setting, should be failed.
self.assertRaises(oss2.exceptions.AccessDenied, oss2.resumable_upload, self.payer_bucket, big_object, file_name,
def test_upload_small(self):
bucket = random.choice([self.bucket, self.rsa_crypto_bucket, self.kms_crypto_bucket])
key = random_string(16)
content = random_bytes(100)
pathname = self._prepare_temp_file(content)
result = oss2.resumable_upload(bucket, key, pathname)
self.assertTrue(result is not None)
self.assertTrue(result.etag is not None)
self.assertTrue(result.request_id is not None)
result = bucket.get_object(key)
self.assertEqual(content, result.read())
self.assertEqual(result.headers['x-oss-object-type'], 'Normal')
bucket.delete_object(key)
def percentage(consumed_bytes, total_bytes):
if total_bytes:
rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
print('\rAre uploading:{0}%'.format(rate), end='')
sys.stdout.flush()
#currentTime = time.strftime('%Y-%m-%d_%H:%M_timestamp:%s',time.localtime(time.time()))
#fileName = currentTime+'.sql' #拼接后缀
#fileName = 'redmine_bk'+fileName #拼接前缀
#fileName = 'sql/'+fileName
ossSqlFilePath = 'sql/'+sqlFileName
print(ossSqlFilePath)
oss2.resumable_upload(bucket,
ossSqlFilePath,
sqlFilePath,
store=oss2.ResumableStore(root='/tmp'),
multipart_threshold=100*1024,
part_size=100*1024,
num_threads=4,
headers={"Content-Type":"application/octet-stream; charset=utf-8"},
progress_callback=percentage)
raise OSError("cannot access '%s': No such file or directory" % to_str_or_bust(path))
else:
logger.info("file \'%s\' selected." % path)
if name:
# OSS中的目录/文件夹概念
# https://help.aliyun.com/knowledge_detail/39527.html
key = name
else:
key = os.path.basename(path)
path = to_str_or_bust(path)
key = to_str_or_bust(key)
logger.info("ready for uploading file to oss")
result = oss2.resumable_upload(bucket, key, path, multipart_threshold=10 * 1024 * 1024)
oss_obj = bucket.get_object(key)
logger.info(" ".join((oss_obj.request_id, str(oss_obj.status))))
logger.info("file \'%s\' uploaded." % path)
return result