Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_kms_cse_encrypt_decrypt_aes_gcm(event_loop, s3_moto_patch, region, bucket_name, s3_key_name):
s3_client, s3_resource = s3_moto_patch
s3_client = s3_client('s3', region_name=region)
await s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': region})
aes_key = b'O\x8b\xdc\x92\x87k\x9aJ{m\x82\xb3\x96\xf7\x93]\xa1\xb2Cl\x86<5\xbe\x13\xaf\xa8\x94\xa2O3\xef'
encrypted_aes_key = b'encrypted_aes_key'
material_descrition = {'kms_cmk_id': 'alias/cmk_id'}
kms_crypto_context = cse.MockKMSCryptoContext(aes_key, material_descrition,
encrypted_aes_key, authenticated_encryption=True)
s3_cse = cse.S3CSE(kms_crypto_context, s3_client_args={'region_name': region})
async with s3_cse:
# Upload file
await s3_cse.put_object(Body=DATA, Bucket=bucket_name, Key=s3_key_name)
encrypted_resp = await s3_client.get_object(Bucket=bucket_name, Key=s3_key_name)
encrypted_resp['Body'] = await encrypted_resp['Body'].read()
# Check it doesnt start with lorem ipsum
assert not encrypted_resp['Body'].startswith(DATA[:10])
# Check metadata for KMS encryption
assert encrypted_resp['Metadata']['x-amz-cek-alg'] == 'AES/GCM/NoPadding'
assert encrypted_resp['Metadata']['x-amz-tag-len'] == '128'
def __init__(self, aes_key: bytes, material_description: dict, encrypted_key: bytes,
authenticated_encryption: bool = True):
super(MockKMSCryptoContext, self).__init__()
self.aes_key = aes_key
self.material_description = material_description
self.encrypted_key = encrypted_key
self.authenticated_encryption = authenticated_encryption