Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
provider = LocalRsaProvider(dir='./', key='rsa-test')
content = b'a' * random.randint(1, 100) * 1024
content_crypto_material = provider.create_content_material()
plain_key = provider.decrypt_encrypted_key(content_crypto_material.encrypted_key)
plain_iv = provider.decrypt_encrypted_iv(content_crypto_material.encrypted_iv)
cipher = content_crypto_material.cipher
stream_encrypted = provider.make_encrypt_adapter(content, cipher)
encrypted_content = stream_encrypted.read()
# reset cipher
cipher.initialize(plain_key, plain_iv)
stream_decrypted = provider.make_decrypt_adapter(encrypted_content, cipher)
self.assertEqual(content, stream_decrypted.read())
silently_remove('./rsa-test.public_key.pem')
silently_remove('./rsa-test.private_key.pem')
provider = RsaProvider(key_pair)
content = b'b' * random.randint(1, 100) * 1024
content_crypto_material = provider.create_content_material()
plain_key = provider.decrypt_encrypted_key(content_crypto_material.encrypted_key)
plain_iv = provider.decrypt_encrypted_iv(content_crypto_material.encrypted_iv)
cipher = content_crypto_material.cipher
stream_encrypted = provider.make_encrypt_adapter(content, cipher)
encrypted_content = stream_encrypted.read()
# reset cipher
cipher.initialize(plain_key, plain_iv)
stream_decrypted = provider.make_decrypt_adapter(encrypted_content, cipher)
self.assertEqual(content, stream_decrypted.read())
def test_rsa_provider_init_keys_not_exist(self):
silently_remove('./rsa-test.public_key.pem')
silently_remove('./rsa-test.private_key.pem')
provider = LocalRsaProvider(dir='./', key='rsa-test')
self.assertTrue(os.path.exists('./rsa-test.public_key.pem'))
self.assertTrue(os.path.exists('./rsa-test.private_key.pem'))
silently_remove('./rsa-test.public_key.pem')
silently_remove('./rsa-test.private_key.pem')
def test_rsa_provider_init_invalid_keys(self):
private_key = RSA.generate(2048)
public_key = private_key.publickey()
# 这个地方修改private_key的内容
private_key = random_string(2048)
with open('./rsa-test.private_key.pem', 'wb') as f:
f.write(oss2.to_bytes(private_key))
with open('./rsa-test.public_key.pem', 'wb') as f:
f.write(public_key.exportKey())
self.assertRaises(ClientError, LocalRsaProvider, dir='./', key='rsa-test')
silently_remove('./rsa-test.public_key.pem')
silently_remove('./rsa-test.private_key.pem')
self.assertRaises(ClientError, RsaProvider, key_pair={'private_key': private_key, 'public_key': public_key})
provider = LocalRsaProvider(dir='./', key='rsa-test')
content = b'a' * random.randint(1, 100) * 1024
content_crypto_material = provider.create_content_material()
plain_key = provider.decrypt_encrypted_key(content_crypto_material.encrypted_key)
plain_iv = provider.decrypt_encrypted_iv(content_crypto_material.encrypted_iv)
cipher = content_crypto_material.cipher
stream_encrypted = provider.make_encrypt_adapter(content, cipher)
encrypted_content = stream_encrypted.read()
# reset cipher
cipher.initialize(plain_key, plain_iv)
stream_decrypted = provider.make_decrypt_adapter(encrypted_content, cipher)
self.assertEqual(content, stream_decrypted.read())
silently_remove('./rsa-test.public_key.pem')
silently_remove('./rsa-test.private_key.pem')
provider = RsaProvider(key_pair)
content = b'b' * random.randint(1, 100) * 1024
content_crypto_material = provider.create_content_material()
plain_key = provider.decrypt_encrypted_key(content_crypto_material.encrypted_key)
plain_iv = provider.decrypt_encrypted_iv(content_crypto_material.encrypted_iv)
cipher = content_crypto_material.cipher
stream_encrypted = provider.make_encrypt_adapter(content, cipher)
encrypted_content = stream_encrypted.read()
# reset cipher
cipher.initialize(plain_key, plain_iv)
stream_decrypted = provider.make_decrypt_adapter(encrypted_content, cipher)
self.assertEqual(content, stream_decrypted.read())
plain_key = provider.get_key()
plain_iv = provider.get_iv()
with patch.object(oss2.utils, 'random_key', return_value=plain_key, autospect=True):
with patch.object(oss2.utils, 'random_iv', return_value=plain_iv, autospect=True):
content_crypto_material = provider.create_content_material()
self.assertFalse(content_crypto_material.is_unencrypted())
self.assertRaises(ClientError, provider_diff.decrypt_encrypted_key,
content_crypto_material.encrypted_key)
self.assertRaises(ClientError, provider_diff.decrypt_encrypted_iv,
content_crypto_material.encrypted_iv)
silently_remove('./rsa-test.public_key.pem')
silently_remove('./rsa-test.private_key.pem')
silently_remove('./rsa-test-diff.public_key.pem')
silently_remove('./rsa-test-diff.private_key.pem')
provider = RsaProvider(key_pair=key_pair)
provider_diff = RsaProvider(key_pair=key_pair_compact)
plain_key = provider.get_key()
plain_iv = provider.get_iv()
with patch.object(oss2.utils, 'random_key', return_value=plain_key, autospect=True):
with patch.object(oss2.utils, 'random_iv', return_value=plain_iv, autospect=True):
content_crypto_material = provider.create_content_material()
self.assertFalse(content_crypto_material.is_unencrypted())
self.assertRaises(ClientError, provider_diff.decrypt_encrypted_key,
content_crypto_material.encrypted_key)
self.assertRaises(ClientError, provider_diff.decrypt_encrypted_iv,
content_crypto_material.encrypted_iv)
def tearDown(self):
for temp_file in self.temp_files:
oss2.utils.silently_remove(temp_file)
clean_and_delete_bucket(self.bucket)
clean_and_delete_bucket_by_prefix(OSS_BUCKET + "-test-")
self.assertEqual(provider.cipher.alg, "AES/CTR/NoPadding")
plain_key = provider.get_key()
self.assertEqual(len(plain_key), provider.cipher.key_len)
plain_iv = provider.get_iv()
with patch.object(oss2.utils, 'random_key', return_value=plain_key, autospect=True):
with patch.object(oss2.utils, 'random_iv', return_value=plain_iv, autospect=True):
content_crypto_material = provider.create_content_material()
self.assertFalse(content_crypto_material.is_unencrypted())
decrypted_key = provider.decrypt_encrypted_key(content_crypto_material.encrypted_key)
decrypted_iv = provider.decrypt_encrypted_iv(content_crypto_material.encrypted_iv)
self.assertEqual(plain_key, decrypted_key)
self.assertEqual(plain_iv, decrypted_iv)
silently_remove('./rsa-test.public_key.pem')
silently_remove('./rsa-test.private_key.pem')
provider = RsaProvider(key_pair=key_pair, passphrase=random_string(8))
self.assertEqual(provider.wrap_alg, "RSA/NONE/PKCS1Padding")
self.assertEqual(provider.cipher.alg, "AES/CTR/NoPadding")
plain_key = provider.get_key()
self.assertEqual(len(plain_key), provider.cipher.key_len)
plain_iv = provider.get_iv()
with patch.object(oss2.utils, 'random_key', return_value=plain_key, autospect=True):
with patch.object(oss2.utils, 'random_iv', return_value=plain_iv, autospect=True):
content_crypto_material = provider.create_content_material()
self.assertFalse(content_crypto_material.is_unencrypted())
decrypted_key = provider.decrypt_encrypted_key(content_crypto_material.encrypted_key)
decrypted_iv = provider.decrypt_encrypted_iv(content_crypto_material.encrypted_iv)
self.assertEqual(plain_key, decrypted_key)
self.assertEqual(plain_iv, decrypted_iv)
with patch.object(os, 'rename', side_effect=mock_rename, autospec=True):
oss2.resumable_download(bucket, key, filename)
new_r = obj.var
self.assertTrue(new_r['tmp_suffix'] != r['tmp_suffix'])
self.assertEqual(new_r['size'], r['size'])
self.assertEqual(new_r['mtime'], r['mtime'])
self.assertEqual(new_r['etag'], r['etag'])
self.assertEqual(new_r['part_size'], r['part_size'])
self.assertEqual(os.path.exists(filename + r['tmp_suffix']), old_tmp_exists)
self.assertTrue(not os.path.exists(filename + new_r['tmp_suffix']))
oss2.utils.silently_remove(filename + r['tmp_suffix'])
def test_rsa_provider_init_invalid_passphrase(self):
private_key = RSA.generate(2048)
public_key = private_key.publickey()
passphrase = random_string(6)
invalid_passphrase = random_string(8)
with open('./rsa-test.private_key.pem', 'wb') as f:
f.write(private_key.exportKey(passphrase=passphrase))
with open('./rsa-test.public_key.pem', 'wb') as f:
f.write(public_key.exportKey(passphrase=passphrase))
self.assertRaises(ClientError, LocalRsaProvider, dir='./', key='rsa-test', passphrase=invalid_passphrase)
silently_remove('./rsa-test.public_key.pem')
silently_remove('./rsa-test.private_key.pem')
private_key_str = RsaKey.exportKey(private_key, passphrase=passphrase)
public_key_str = RsaKey.exportKey(public_key, passphrase=passphrase)
self.assertRaises(ClientError, RsaProvider,
key_pair={'private_key': private_key_str, 'public_key': public_key_str},
passphrase=invalid_passphrase)
self.assertEqual(provider.wrap_alg, "RSA/NONE/OAEPWithSHA-1AndMGF1Padding")
self.assertEqual(provider.cipher.alg, "AES/CTR/NoPadding")
plain_key = provider.get_key()
self.assertEqual(len(plain_key), provider.cipher.key_len)
plain_iv = provider.get_iv()
with patch.object(oss2.utils, 'random_key', return_value=plain_key, autospect=True):
with patch.object(oss2.utils, 'random_iv', return_value=plain_iv, autospect=True):
content_crypto_material = provider.create_content_material()
self.assertFalse(content_crypto_material.is_unencrypted())
decrypted_key = provider.decrypt_encrypted_key(content_crypto_material.encrypted_key)
decrypted_iv = provider.decrypt_encrypted_iv(content_crypto_material.encrypted_iv)
self.assertEqual(plain_key, decrypted_key)
self.assertEqual(plain_iv, decrypted_iv)
silently_remove('./rsa-test.public_key.pem')
silently_remove('./rsa-test.private_key.pem')
provider = RsaProvider(key_pair=key_pair, passphrase=random_string(8))
self.assertEqual(provider.wrap_alg, "RSA/NONE/PKCS1Padding")
self.assertEqual(provider.cipher.alg, "AES/CTR/NoPadding")
plain_key = provider.get_key()
self.assertEqual(len(plain_key), provider.cipher.key_len)
plain_iv = provider.get_iv()
with patch.object(oss2.utils, 'random_key', return_value=plain_key, autospect=True):
with patch.object(oss2.utils, 'random_iv', return_value=plain_iv, autospect=True):
content_crypto_material = provider.create_content_material()
self.assertFalse(content_crypto_material.is_unencrypted())
decrypted_key = provider.decrypt_encrypted_key(content_crypto_material.encrypted_key)
decrypted_iv = provider.decrypt_encrypted_iv(content_crypto_material.encrypted_iv)
self.assertEqual(plain_key, decrypted_key)