Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
content1 = b'a' * (150 * 1024)
big_object = 'requestpayment-test-resumable-down-big-object'
content2 = b'a' * (500 * 1024)
file_name = small_object + '.txt'
self.bucket.put_object(small_object, content1)
self.bucket.put_object(big_object, content2)
# Resumale down small object without payer setting, should be failed.
self.assertRaises(oss2.exceptions.ServerError, oss2.resumable_download, self.payer_bucket, small_object, file_name,
multiget_threshold=(200*1024), num_threads=2, part_size=(1024*1024))
# Resumale down small object with payer setting, should be successful.
headers = dict()
headers[OSS_REQUEST_PAYER] = "requester"
oss2.resumable_download(self.payer_bucket, small_object, file_name,
multiget_threshold=(200*1024), num_threads=2, part_size=(1024*1024), headers=headers)
# Check file size
file_size = os.stat(file_name).st_size
self.assertEqual(file_size, (150*1024))
os.remove(file_name)
self.bucket.delete_object(small_object)
file_name = big_object + '.txt'
# Resumale down big object without payer setting, should be failed.
self.assertRaises(oss2.exceptions.ServerError, oss2.resumable_download, self.payer_bucket, big_object, file_name,
multiget_threshold=(200*1024), num_threads=2, part_size=(1024*1024))
# Resumale down big object with payer setting, should be successful.
# rename之前将新的record获取到
def mock_rename(src, dst):
r = self.__record(key, filename)
new_context['tmp_suffix'] = r['tmp_suffix']
new_context['etag'] = r['etag']
orig_rename(src, dst)
with patch.object(oss2.resumable._ResumableDownloader, '_ResumableDownloader__download_part',
side_effect=partial(mock_download_part, part_number=5),
autospec=True):
self.assertRaises(oss2.exceptions.PreconditionFailed, oss2.resumable_download, bucket, key, filename)
with patch.object(os, 'rename', side_effect=mock_rename):
oss2.resumable_download(bucket, key, filename)
self.assertTrue(new_context['tmp_suffix'] != old_context['tmp_suffix'])
self.assertTrue(new_context['etag'] != old_context['etag'])
oss2.defaults.multiget_num_threads = 1
oss2.defaults.min_part_size = 100
stats = {'previous': -1, 'called': 0}
def progress_callback(bytes_consumed, total_bytes):
self.assertTrue(bytes_consumed <= total_bytes)
self.assertTrue(bytes_consumed > stats['previous'])
stats['previous'] = bytes_consumed
stats['called'] += 1
file_size = 100 * 5 + 1
key, filename, content = self.__prepare(bucket, file_size)
oss2.resumable_download(bucket, key, filename, progress_callback=progress_callback)
self.assertEqual(stats['previous'], file_size)
self.assertEqual(stats['called'], oss2.utils.how_many(file_size, oss2.defaults.multiget_part_size) + 1)
def test_resumable_incomplete_download(self):
# """One of the part is incomplete, while there's no exception raised."""
oss2.defaults.multiget_threshold = 1
oss2.defaults.multiget_part_size = 100
oss2.defaults.multiget_num_threads = 5
oss2.defaults.min_part_size = 100
file_size = 123 * 3 + 1
key, filename, content = self.__prepare(self.bucket, file_size)
with patch.object(oss2.Bucket, 'get_object',
side_effect=partial(mock_get_object, content_length=file_size),
autospec=True):
try:
oss2.resumable_download(self.bucket, key, filename)
except oss2.exceptions.InconsistentError as e:
self.assertTrue(e.request_id)
self.assertEqual(e.body, 'InconsistentError: IncompleteRead from source')
except:
self.assertTrue(False)
def __test_normal(self, file_size):
bucket = random.choice([self.bucket, self.rsa_crypto_bucket, self.kms_crypto_bucket])
key, filename, content = self.__prepare(bucket, file_size)
oss2.resumable_download(bucket, key, filename)
self.assertFileContent(filename, content)
def test_resumable_down(self):
small_object = 'requestpayment-test-resumable-down-small-object'
content1 = b'a' * (150 * 1024)
big_object = 'requestpayment-test-resumable-down-big-object'
content2 = b'a' * (500 * 1024)
file_name = small_object + '.txt'
self.bucket.put_object(small_object, content1)
self.bucket.put_object(big_object, content2)
# Resumale down small object without payer setting, should be failed.
self.assertRaises(oss2.exceptions.ServerError, oss2.resumable_download, self.payer_bucket, small_object, file_name,
multiget_threshold=(200*1024), num_threads=2, part_size=(1024*1024))
# Resumale down small object with payer setting, should be successful.
headers = dict()
headers[OSS_REQUEST_PAYER] = "requester"
oss2.resumable_download(self.payer_bucket, small_object, file_name,
multiget_threshold=(200*1024), num_threads=2, part_size=(1024*1024), headers=headers)
# Check file size
file_size = os.stat(file_name).st_size
self.assertEqual(file_size, (150*1024))
os.remove(file_name)
self.bucket.delete_object(small_object)
file_name = big_object + '.txt'
oss2.resumable_download(self.bucket, key, file_name, multiget_threshold=(OBJECT_SIZE_1MB*2), num_threads=1, headers=headers)
self.assertFileContent(file_name, content)
end_time_sec = int(time.time())
os.remove(file_name)
# Calculate expensed time
expense_time_sec = end_time_sec - start_time_sec
# Theoretical time is 1MB/100KB = 10s, set the minimum theoretical time to 10*0.7s
theoretical_exepnse_min = 10 * 0.7
# Compare to minimum theoretical time
self.assertEqual((expense_time_sec>theoretical_exepnse_min), True)
# Resumable download object bigger than multiget_threshold with traffic limit setting.
start_time_sec = int(time.time())
oss2.resumable_download(self.bucket, key, file_name, multiget_threshold=(OBJECT_SIZE_1MB-1024), num_threads=1, headers=headers)
self.assertFileContent(file_name, content)
end_time_sec = int(time.time())
os.remove(file_name)
self.bucket.delete_object(key)
# Calculate expensed time
expense_time_sec = end_time_sec - start_time_sec
# Theoretical time is 1MB/100KB = 10s, set the minimum theoretical time to 10*0.7s
theoretical_exepnse_min = 10 * 0.7
# Compare to minimum theoretical time
self.assertEqual((expense_time_sec>theoretical_exepnse_min), True)
try:
if not os.path.exists('%s/%s' %(dockerfile_path,dm_name)):
os.mkdir('%s/%s' %(dockerfile_path,dm_name))
for obj in oss2.ObjectIterator(bucket):
if obj.key.endswith('.war') or obj.key.endswith('.tar.gz') or obj.key.endswith('.jar'):
obj_name = obj.key.split('/')[-1].replace('_','-')
if obj_name.startswith(package_name) and version in obj_name:
oss_project_path = obj.key
break
except Exception as e:
logging.error(e)
if oss_project_path:
#尝试3次下载
for i in range(3):
try:
oss2.resumable_download(bucket,oss_project_path, project_path)
break
except:
continue
else:
Redis.lpush(redis_key, '%s package not fond!' %package)
_flow_log('%s package not fond!' %package)
return False
except Exception as e:
logging.error(e)
if os.path.exists(project_path):
try:
Redis.lpush(redis_key, '检测到文件%s' % project_path)
_flow_log('检测到文件%s' % project_path)
if project_file.endswith('.tar.gz'):
project_file = project_file.split('.')[0]
os.chdir('%s/%s/' % (dockerfile_path,dm_name))
"""
范围下载
"""
# 带进度条的范围下载
result = bucket.get_object(key, byte_range=(1024, 2047), progress_callback=percentage)
content_got = b''
for chunk in result:
content_got += chunk
assert 'a'*1024 == content_got
"""
断点续传下载
"""
# 带进度条的断点续传下载
filename = 'download.txt'
oss2.resumable_download(bucket, key, filename,
multiget_threshold=200*1024,
part_size=100*1024,
num_threads=3,
progress_callback=percentage)
os.remove(filename)
# 删除上传的文件
bucket.delete_object(key)