Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_crypto_init_multipart_with_out_data_size(self):
crypto_bucket = random.choice([self.rsa_crypto_bucket, self.kms_crypto_bucket])
key = self.random_key()
# upload_context is none
self.assertRaises(oss2.exceptions.ClientError, crypto_bucket.init_multipart_upload, key)
# data_size is none
part_size = 1024 * 100
context = models.MultipartUploadCryptoContext(part_size=part_size)
self.assertRaises(oss2.exceptions.ClientError, crypto_bucket.init_multipart_upload, key, upload_context=context)
def test_invalid_bucket_name(self):
bucket_name = random_string(64)
self.assertRaises(oss2.exceptions.ClientError, oss2.Bucket, oss2.AnonymousAuth(), OSS_ENDPOINT, bucket_name)
crypto_bucket = choice([self.rsa_crypto_bucket, self.kms_crypto_bucket])
key = self.random_key()
content = random_bytes(1024)
crypto_bucket.put_object(key, content)
get_result = crypto_bucket.get_object(key, byte_range=(None, None))
self.assertEqual(get_result.read(), content)
range_start = random.randint(0, 1024)
get_result = crypto_bucket.get_object(key, byte_range=(range_start, None))
self.assertEqual(get_result.read(), content[range_start:])
# CryptoBucket由于需要range get的start值与block_size对齐,这种情况下不支持range_start为None的这种情况
range_end = random.randint(0, 1024)
self.assertRaises(ClientError, crypto_bucket.get_object, key, byte_range=(None, range_end))
range_start = random.randint(0, 512)
range_end = range_start + random.randint(0, 512)
get_result = crypto_bucket.get_object(key, byte_range=(range_start, range_end))
self.assertEqual(get_result.read(), content[range_start:range_end + 1])
def test_rsa_provider_init_cipher_is_none(self):
self.assertRaises(ClientError, LocalRsaProvider, dir='./', key='rsa-test', cipher=None)
self.assertRaises(ClientError, RsaProvider, key_pair=key_pair, cipher=None)
def test_error_bucket_encryption(self):
# both encryption set.
self.assertRaises(oss2.exceptions.ClientError, InventoryBucketDestination,
account_id=OSS_INVENTORY_BUCKET_DESTINATION_ACCOUNT,
role_arn=OSS_INVENTORY_BUCKET_DESTINATION_ARN,
bucket="test-bucket-name",
inventory_format=INVENTORY_FORMAT_CSV,
prefix="test-",
sse_kms_encryption = InventoryServerSideEncryptionKMS("test-kms-id"),
sse_oss_encryption=InventoryServerSideEncryptionOSS())
def test_batch_delete_objects_empty(self):
try:
self.bucket.batch_delete_objects([])
except ClientError as e:
self.assertEqual(e.status, oss2.exceptions.OSS_CLIENT_ERROR_STATUS)
self.assertEqual(e.request_id, '')
self.assertEqual(e.code, '')
self.assertEqual(e.message, '')
self.assertTrue(e.body)
self.assertTrue(str(e))
def __do(self, req):
try:
body = self.clt.do_action_with_exception(req)
return json.loads(to_unicode(body))
except ServerException as e:
raise OpenApiServerError(e.http_status, e.request_id, e.message, e.error_code)
except ClientException as e:
raise ClientError(e.message)
except (ValueError, TypeError) as e:
raise OpenApiFormatError('Json Error: ' + str(e))
def __init__(self, access_key_id, access_key_secret, region, cmkey, sts_token = None, passphrase=None, cipher=utils.AESCipher):
if not issubclass(cipher, utils.AESCipher):
raise ClientError('AliKMSProvider only support AES256 cipher')
super(AliKMSProvider, self).__init__(cipher=cipher)
self.cmkey = cmkey
self.sts_token = sts_token
self.context = '{"x-passphrase":"' + passphrase + '"}' if passphrase else ''
self.clt = client.AcsClient(access_key_id, access_key_secret, region)
self.encrypted_key = None