Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Possible remove AEAD tag if our range covers the end
aead_tag_len = int(metadata['x-amz-tag-len']) // 8
max_offset = entire_file_length - aead_tag_len - 1
desired_end = max_offset if desired_end > max_offset else desired_end
# Chop file
result = result[desired_start:desired_end]
else:
aesgcm = AESGCM(aes_key)
try:
result = await self._loop.run_in_executor(None, lambda: aesgcm.decrypt(iv, file_data, None))
except InvalidTag:
raise DecryptError('Failed to decrypt, AEAD tag is incorrect. Possible key or IV are incorrect')
else:
if range_start:
raise DecryptError('Cannot decrypt AES-CBC file with range')
# AES/CBC/PKCS5Padding
aescbc = Cipher(AES(aes_key), CBC(iv), backend=self._backend).decryptor()
padded_result = await self._loop.run_in_executor(
None, lambda: (aescbc.update(file_data) + aescbc.finalize()))
unpadder = PKCS7(AES.block_size).unpadder()
result = await self._loop.run_in_executor(
None, lambda: (unpadder.update(padded_result) + unpadder.finalize()))
return result
async def _decrypt_v1(self, file_data: bytes, metadata: Dict[str, str], range_start: Optional[int] = None) -> bytes:
if range_start:
raise DecryptError('Cant do range get when not using KMS encryption')
decryption_key = base64.b64decode(metadata['x-amz-key'])
material_description = json.loads(metadata['x-amz-matdesc'])
aes_key = await self._crypto_context.get_decryption_aes_key(decryption_key, material_description)
# x-amz-key - Contains base64 encrypted key
# x-amz-iv - AES IVs
# x-amz-matdesc - JSON Description of client-side master key (used as encryption context as is)
# x-amz-unencrypted-content-length - Unencrypted content length
iv = base64.b64decode(metadata['x-amz-iv'])
# TODO look at doing AES as stream
# AES/CBC/PKCS5Padding
desired_end = max_offset if desired_end > max_offset else desired_end
# Chop file
result = result[desired_start:desired_end]
else:
aesgcm = AESGCM(aes_key)
try:
result = await self._loop.run_in_executor(None, lambda: aesgcm.decrypt(iv, file_data, None))
except InvalidTag:
raise DecryptError('Failed to decrypt, AEAD tag is incorrect. Possible key or IV are incorrect')
else:
if range_start:
raise DecryptError('Cannot decrypt AES-CBC file with range')
# AES/CBC/PKCS5Padding
aescbc = Cipher(AES(aes_key), CBC(iv), backend=self._backend).decryptor()
padded_result = await self._loop.run_in_executor(
None, lambda: (aescbc.update(file_data) + aescbc.finalize()))
unpadder = PKCS7(AES.block_size).unpadder()
result = await self._loop.run_in_executor(
None, lambda: (unpadder.update(padded_result) + unpadder.finalize()))
return result