Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
form { "v" : BSON value to encrypt }}.
- `opts`: A :class:`ExplicitEncryptOpts`.
"""
super(ExplicitEncryptionContext, self).__init__(ctx)
try:
algorithm = str_to_bytes(opts.algorithm)
if not lib.mongocrypt_ctx_setopt_algorithm(ctx, algorithm, -1):
self._raise_from_status()
if opts.key_id is not None:
with MongoCryptBinaryIn(opts.key_id) as binary:
if not lib.mongocrypt_ctx_setopt_key_id(ctx, binary.bin):
self._raise_from_status()
if opts.key_alt_name is not None:
with MongoCryptBinaryIn(opts.key_alt_name) as binary:
if not lib.mongocrypt_ctx_setopt_key_alt_name(ctx,
binary.bin):
self._raise_from_status()
with MongoCryptBinaryIn(value) as binary:
if not lib.mongocrypt_ctx_explicit_encrypt_init(ctx,
binary.bin):
self._raise_from_status()
except Exception:
# Destroy the context on error.
self._close()
raise
def __init__(self, ctx, command):
"""Abstracts libmongocrypt's mongocrypt_ctx_t type.
:Parameters:
- `ctx`: A mongocrypt_ctx_t. This MongoCryptContext takes ownership
of the underlying mongocrypt_ctx_t.
- `command`: The encoded BSON command to decrypt.
"""
super(DecryptionContext, self).__init__(ctx)
try:
with MongoCryptBinaryIn(command) as binary:
if not lib.mongocrypt_ctx_decrypt_init(ctx, binary.bin):
self._raise_from_status()
except Exception:
# Destroy the context on error.
self._close()
raise
:Parameters:
- `ctx`: A mongocrypt_ctx_t. This MongoCryptContext takes ownership
of the underlying mongocrypt_ctx_t.
- `value`: The encoded document to encrypt, which must be in the
form { "v" : BSON value to encrypt }}.
- `opts`: A :class:`ExplicitEncryptOpts`.
"""
super(ExplicitEncryptionContext, self).__init__(ctx)
try:
algorithm = str_to_bytes(opts.algorithm)
if not lib.mongocrypt_ctx_setopt_algorithm(ctx, algorithm, -1):
self._raise_from_status()
if opts.key_id is not None:
with MongoCryptBinaryIn(opts.key_id) as binary:
if not lib.mongocrypt_ctx_setopt_key_id(ctx, binary.bin):
self._raise_from_status()
if opts.key_alt_name is not None:
with MongoCryptBinaryIn(opts.key_alt_name) as binary:
if not lib.mongocrypt_ctx_setopt_key_alt_name(ctx,
binary.bin):
self._raise_from_status()
with MongoCryptBinaryIn(value) as binary:
if not lib.mongocrypt_ctx_explicit_encrypt_init(ctx,
binary.bin):
self._raise_from_status()
except Exception:
# Destroy the context on error.
self._close()
ctx, region, len(region), key, len(key)):
self._raise_from_status()
if 'endpoint' in opts.master_key:
endpoint = str_to_bytes(opts.master_key['endpoint'])
if not lib.mongocrypt_ctx_setopt_masterkey_aws_endpoint(
ctx, endpoint, len(endpoint)):
self._raise_from_status()
elif kms_provider == 'local':
if not lib.mongocrypt_ctx_setopt_masterkey_local(ctx):
self._raise_from_status()
else:
raise ValueError('unknown kms_provider: %s' % (kms_provider,))
if opts.key_alt_names:
for key_alt_name in opts.key_alt_names:
with MongoCryptBinaryIn(key_alt_name) as binary:
if not lib.mongocrypt_ctx_setopt_key_alt_name(
ctx, binary.bin):
self._raise_from_status()
if not lib.mongocrypt_ctx_datakey_init(ctx):
self._raise_from_status()
except Exception:
# Destroy the context on error.
self._close()
raise
def __init__(self, ctx, value):
"""Abstracts libmongocrypt's mongocrypt_ctx_t type.
:Parameters:
- `ctx`: A mongocrypt_ctx_t. This MongoCryptContext takes ownership
of the underlying mongocrypt_ctx_t.
- `value`: The encoded BSON value to decrypt.
"""
super(ExplicitDecryptionContext, self).__init__(ctx)
try:
with MongoCryptBinaryIn(value) as binary:
if not lib.mongocrypt_ctx_explicit_decrypt_init(ctx,
binary.bin):
self._raise_from_status()
except Exception:
# Destroy the context on error.
self._close()
raise
def __init(self):
"""Internal init helper."""
kms_providers = self.__opts.kms_providers
if 'aws' in kms_providers:
access_key_id = str_to_bytes(kms_providers['aws']['accessKeyId'])
secret_access_key = str_to_bytes(
kms_providers['aws']['secretAccessKey'])
if not lib.mongocrypt_setopt_kms_provider_aws(
self.__crypt,
access_key_id, len(access_key_id),
secret_access_key, len(secret_access_key)):
self.__raise_from_status()
if 'local' in kms_providers:
key = kms_providers['local']['key']
with MongoCryptBinaryIn(key) as binary_key:
if not lib.mongocrypt_setopt_kms_provider_local(
self.__crypt, binary_key.bin):
self.__raise_from_status()
schema_map = self.__opts.schema_map
if schema_map is not None:
with MongoCryptBinaryIn(schema_map) as binary_schema_map:
if not lib.mongocrypt_setopt_schema_map(
self.__crypt, binary_schema_map.bin):
self.__raise_from_status()
if not lib.mongocrypt_setopt_crypto_hooks(
self.__crypt, aes_256_cbc_encrypt, aes_256_cbc_decrypt,
secure_random, hmac_sha_512, hmac_sha_256, sha_256, ffi.NULL):
self.__raise_from_status()
kms_providers['aws']['secretAccessKey'])
if not lib.mongocrypt_setopt_kms_provider_aws(
self.__crypt,
access_key_id, len(access_key_id),
secret_access_key, len(secret_access_key)):
self.__raise_from_status()
if 'local' in kms_providers:
key = kms_providers['local']['key']
with MongoCryptBinaryIn(key) as binary_key:
if not lib.mongocrypt_setopt_kms_provider_local(
self.__crypt, binary_key.bin):
self.__raise_from_status()
schema_map = self.__opts.schema_map
if schema_map is not None:
with MongoCryptBinaryIn(schema_map) as binary_schema_map:
if not lib.mongocrypt_setopt_schema_map(
self.__crypt, binary_schema_map.bin):
self.__raise_from_status()
if not lib.mongocrypt_setopt_crypto_hooks(
self.__crypt, aes_256_cbc_encrypt, aes_256_cbc_decrypt,
secure_random, hmac_sha_512, hmac_sha_256, sha_256, ffi.NULL):
self.__raise_from_status()
if not lib.mongocrypt_init(self.__crypt):
self.__raise_from_status()
def add_mongo_operation_result(self, document):
"""Adds the mongo operation's command response.
:Parameters:
- `document`: A raw BSON command response document.
"""
with MongoCryptBinaryIn(document) as binary:
if not lib.mongocrypt_ctx_mongo_feed(self.__ctx, binary.bin):
self._raise_from_status()
def feed(self, data):
"""Feed bytes from the HTTP response.
:Parameters:
- `data`: The bytes of the HTTP response. Must not exceed
:attr:`bytes_needed`.
"""
with MongoCryptBinaryIn(data) as binary:
if not lib.mongocrypt_kms_ctx_feed(self.__ctx, binary.bin):
self.__raise_from_status()