Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __test_resume(self, content_size, uploaded_parts, expected_unfinished=0):
bucket = random.choice([self.bucket, self.rsa_crypto_bucket, self.kms_crypto_bucket])
part_size = 100 * 1024
num_parts = (content_size + part_size - 1) // part_size
key = 'resume-' + random_string(32)
content = random_bytes(content_size)
encryption_flag = isinstance(bucket, oss2.CryptoBucket)
context = None
pathname = self._prepare_temp_file(content)
if encryption_flag:
context = models.MultipartUploadCryptoContext(content_size, part_size)
upload_id = bucket.init_multipart_upload(key, upload_context=context).upload_id
else:
upload_id = bucket.init_multipart_upload(key).upload_id
for part_number in uploaded_parts:
start = (part_number - 1) * part_size
if part_number == num_parts:
end = content_size
else:
end = start + part_size
if encryption_flag:
bucket.upload_part(key, upload_id, part_number, content[start:end], upload_context=context)
else:
bucket.upload_part(key, upload_id, part_number, content[start:end])
key2_content_3 = random_bytes(100 * 1024)
key2_content = [key2_content_1, key2_content_2, key2_content_3]
key2_parts = []
key2_data_size = 1024 * 500
key2_part_size = 1024 * 200
context2 = models.MultipartUploadCryptoContext(key2_data_size, key2_part_size)
key2_init_result = crypto_bucket.init_multipart_upload(key2, upload_context=context2)
self.assertTrue(key2_init_result.status == 200)
key2_upload_id = key2_init_result.upload_id
for i in range(3):
key1_upload_result = crypto_bucket.upload_part(key1, key1_upload_id, i + 1, key1_content[i],
upload_context=context1)
key1_parts.append(oss2.models.PartInfo(i + 1, key1_upload_result.etag,
part_crc=key1_upload_result.crc))
self.assertTrue(key1_upload_result.status == 200)
self.assertTrue(key1_upload_result.crc is not None)
key2_upload_result = crypto_bucket.upload_part(key2, key2_upload_id, i + 1, key2_content[i],
upload_context=context2)
key2_parts.append(oss2.models.PartInfo(i + 1, key2_upload_result.etag, size=len(key2_content[i]),
part_crc=key2_upload_result.crc))
self.assertTrue(key2_upload_result.status == 200)
self.assertTrue(key2_upload_result.crc is not None)
key1_complete_result = crypto_bucket.complete_multipart_upload(key1, key1_upload_id, key1_parts)
self.assertTrue(key1_complete_result.status == 200)
key1_get_result = crypto_bucket.get_object(key1)
self.assertTrue(key1_get_result.status == 200)
context2 = models.MultipartUploadCryptoContext(key2_data_size, key2_part_size)
key2_init_result = crypto_bucket.init_multipart_upload(key2, upload_context=context2)
self.assertTrue(key2_init_result.status == 200)
key2_upload_id = key2_init_result.upload_id
for i in range(3):
key1_upload_result = crypto_bucket.upload_part(key1, key1_upload_id, i + 1, key1_content[i],
upload_context=context1)
key1_parts.append(oss2.models.PartInfo(i + 1, key1_upload_result.etag,
part_crc=key1_upload_result.crc))
self.assertTrue(key1_upload_result.status == 200)
self.assertTrue(key1_upload_result.crc is not None)
key2_upload_result = crypto_bucket.upload_part(key2, key2_upload_id, i + 1, key2_content[i],
upload_context=context2)
key2_parts.append(oss2.models.PartInfo(i + 1, key2_upload_result.etag, size=len(key2_content[i]),
part_crc=key2_upload_result.crc))
self.assertTrue(key2_upload_result.status == 200)
self.assertTrue(key2_upload_result.crc is not None)
key1_complete_result = crypto_bucket.complete_multipart_upload(key1, key1_upload_id, key1_parts)
self.assertTrue(key1_complete_result.status == 200)
key1_get_result = crypto_bucket.get_object(key1)
self.assertTrue(key1_get_result.status == 200)
key1_content_got = key1_get_result.read()
self.assertEqual(key1_content_1, key1_content_got[0:102400])
self.assertEqual(key1_content_2, key1_content_got[102400:204800])
self.assertEqual(key1_content_3, key1_content_got[204800:307200])
key2_complete_result = crypto_bucket.complete_multipart_upload(key2, key2_upload_id, key2_parts)
self.assertTrue(key2_complete_result.status == 200)
def test_crypto_multipart_concurrency(self):
crypto_bucket = random.choice([self.rsa_crypto_bucket, self.kms_crypto_bucket])
key1 = self.random_key()
key1_content_1 = random_bytes(100 * 1024)
key1_content_2 = random_bytes(100 * 1024)
key1_content_3 = random_bytes(100 * 1024)
key1_content = [key1_content_1, key1_content_2, key1_content_3]
key1_parts = []
key1_data_size = 1024 * 300
key1_part_size = 1024 * 100
context1 = models.MultipartUploadCryptoContext(key1_data_size, key1_part_size)
key1_init_result = crypto_bucket.init_multipart_upload(key1, upload_context=context1)
self.assertTrue(key1_init_result.status == 200)
key1_upload_id = key1_init_result.upload_id
key2 = self.random_key()
key2_content_1 = random_bytes(200 * 1024)
key2_content_2 = random_bytes(200 * 1024)
key2_content_3 = random_bytes(100 * 1024)
key2_content = [key2_content_1, key2_content_2, key2_content_3]
key2_parts = []
key2_data_size = 1024 * 500
key2_part_size = 1024 * 200
context2 = models.MultipartUploadCryptoContext(key2_data_size, key2_part_size)
key2_init_result = crypto_bucket.init_multipart_upload(key2, upload_context=context2)
User-Agent: aliyun-sdk-python/2.0.2(Windows/7/;3.3.3)
Accept: */*
authorization: OSS ZCDmm7TPZKHtx77j:cmWZPrAca3p4IZaAc3iqJoQEzNw=
*HEADGET*1000'''
response_text = '''HTTP/1.1 200 OK
Server: AliyunOSS
Date: Sat, 12 Dec 2015 00:35:35 GMT
Content-Length: 0
Connection: keep-alive
x-oss-request-id: 566B6BD7D9816D686F72A86A'''
req_info = unittests.common.mock_response(do_request, response_text)
rule = oss2.models.CorsRule(allowed_origins=['*'],
allowed_methods=['HEAD', 'GET'],
allowed_headers=['*'],
max_age_seconds=1000)
unittests.common.bucket().put_bucket_cors(oss2.models.BucketCors([rule]))
self.assertRequest(req_info ,request_text)
root = ElementTree.fromstring(req_info.data)
rule_node = root.find('CORSRule')
self.assertSortedListEqual(rule.allowed_origins, all_tags(rule_node, 'AllowedOrigin'))
self.assertSortedListEqual(rule.allowed_methods, all_tags(rule_node, 'AllowedMethod'))
self.assertSortedListEqual(rule.allowed_headers, all_tags(rule_node, 'AllowedHeader'))
self.assertEqual(rule.max_age_seconds, int(rule_node.find('MaxAgeSeconds').text))
User-Agent: aliyun-sdk-python/2.0.2(Windows/7/;3.3.3)
Accept: */*
authorization: OSS ZCDmm7TPZKHtx77j:ZUVg/fNrUVyan0Y5xhz5zvcPZcs=
{0}{1}'''
response_text = '''HTTP/1.1 200 OK
Server: AliyunOSS
Date: Sat, 12 Dec 2015 00:35:47 GMT
Content-Length: 0
Connection: keep-alive
x-oss-request-id: 566B6BE31BA604C27DD429E8'''
for index, error in [('index+中文.html', 'error.中文') ,(u'中-+()文.index', u'@#$%中文.error')]:
req_info = unittests.common.mock_response(do_request, response_text)
unittests.common.bucket().put_bucket_website(oss2.models.BucketWebsite(index, error))
self.assertRequest(req_info, request_text.format(oss2.to_string(index), oss2.to_string(error)))
private
'''
headers = oss2.CaseInsensitiveDict({
'Server': 'AliyunOSS',
'Date': 'Fri, 11 Dec 2015 11:40:30 GMT',
'Content-Length': len(body),
'Connection': 'keep-alive',
'x-oss-request-id': '566AB62EB06147681C283D73',
'ETag': '7AE1A589ED6B161CAD94ACDB98206DA6'
})
resp = MockResponse(200, headers, body)
result = oss2.models.GetBucketInfoResult(resp)
parse_get_bucket_info(result, body)
self.assertEqual(result.location, 'oss-cn-hangzhou')
self.assertIsNone(result.data_redundancy_type)
self.assertIsNone(result.comment)
self.assertIsNone(result.versioning_status)
self.assertIsNone(result.bucket_encryption_rule)
assert result.headers['x-oss-hash-crc64ecma'] == '108247482078852440'
# 删除上传的文件
bucket.delete_object(key)
"""
分片上传回调
"""
# 分片上传回调
# 初始化上传任务
parts = []
upload_id = bucket.init_multipart_upload(key).upload_id
# 上传分片
result = bucket.upload_part(key, upload_id, 1, content)
parts.append(oss2.models.PartInfo(1, result.etag))
# 完成上传并回调
result = bucket.complete_multipart_upload(key, upload_id, parts, headers)
# 上传并回调成功status为200,上传成功回调失败status为203
assert result.status == 200
# result.resp的内容为回调服务器返回的内容
assert result.resp.read() == b'{"Status":"OK"}'
# 确认文件上传成功
result = bucket.head_object(key)
assert result.headers['x-oss-hash-crc64ecma'] == '108247482078852440'
# 删除上传的文件
bucket.delete_object(key)
# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, '请设置参数:' + param
# 创建Bucket对象,所有直播相关的接口都可以通过Bucket对象来进行
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
# 创建一个直播频道。
# 频道的名称是test_rtmp_live。直播生成的m3u8文件叫做test.m3u8,该索引文件包含3片ts文件,每片ts文件的时长为5秒(这只是一个建议值,具体的时长取决于关键帧)。
channel_name = 'test_rtmp_live'
playlist_name = 'test.m3u8'
create_result = bucket.create_live_channel(
channel_name,
oss2.models.LiveChannelInfo(
status = 'enabled',
description = '测试使用的直播频道',
target = oss2.models.LiveChannelInfoTarget(
playlist_name = playlist_name,
frag_count = 3,
frag_duration = 5)))
# 创建直播频道之后拿到推流用的play_url(rtmp推流的url,如果Bucket不是公共读写权限那么还需要带上签名,见下文示例)和观流用的publish_url(推流产生的m3u8文件的url)。
publish_url = create_result.publish_url
play_url = create_result.play_url
# 创建好直播频道之后调用get_live_channel可以得到频道相关的信息。
get_result = bucket.get_live_channel(channel_name)
print(get_result.description)
print(get_result.status)
print(get_result.target.type)
# 初始化分片上传,得到Upload ID。接下来的接口都要用到这个Upload ID。
key = file.replace('../', '')
upload_id = self.bucket.init_multipart_upload(key).upload_id
# 逐个上传分片
# 其中oss2.SizedFileAdapter()把fileobj转换为一个新的文件对象,新的文件对象可读的长度等于size_to_upload
with open(file, 'rb') as fileobj:
parts = []
part_number = 1
offset = 0
while offset < total_size:
size_to_upload = min(part_size, total_size - offset)
result = self.bucket.upload_part(key, upload_id, part_number,
oss2.SizedFileAdapter(fileobj, size_to_upload))
parts.append(oss2.models.PartInfo(part_number, result.etag, size=size_to_upload, part_crc=result.crc))
offset += size_to_upload
part_number += 1
# 完成分片上传
self.bucket.complete_multipart_upload(key, upload_id, parts)
# 验证一下
with open(file, 'rb') as fileobj:
assert self.bucket.get_object(key).read() == fileobj.read()