Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if not lib.mongocrypt_ctx_setopt_algorithm(ctx, algorithm, -1):
self._raise_from_status()
if opts.key_id is not None:
with MongoCryptBinaryIn(opts.key_id) as binary:
if not lib.mongocrypt_ctx_setopt_key_id(ctx, binary.bin):
self._raise_from_status()
if opts.key_alt_name is not None:
with MongoCryptBinaryIn(opts.key_alt_name) as binary:
if not lib.mongocrypt_ctx_setopt_key_alt_name(ctx,
binary.bin):
self._raise_from_status()
with MongoCryptBinaryIn(value) as binary:
if not lib.mongocrypt_ctx_explicit_encrypt_init(ctx,
binary.bin):
self._raise_from_status()
except Exception:
# Destroy the context on error.
self._close()
raise
def __init__(self, ctx, command):
"""Abstracts libmongocrypt's mongocrypt_ctx_t type.
:Parameters:
- `ctx`: A mongocrypt_ctx_t. This MongoCryptContext takes ownership
of the underlying mongocrypt_ctx_t.
- `command`: The encoded BSON command to decrypt.
"""
super(DecryptionContext, self).__init__(ctx)
try:
with MongoCryptBinaryIn(command) as binary:
if not lib.mongocrypt_ctx_decrypt_init(ctx, binary.bin):
self._raise_from_status()
except Exception:
# Destroy the context on error.
self._close()
raise
def __init__(self, ctx, value):
"""Abstracts libmongocrypt's mongocrypt_ctx_t type.
:Parameters:
- `ctx`: A mongocrypt_ctx_t. This MongoCryptContext takes ownership
of the underlying mongocrypt_ctx_t.
- `value`: The encoded BSON value to decrypt.
"""
super(ExplicitDecryptionContext, self).__init__(ctx)
try:
with MongoCryptBinaryIn(value) as binary:
if not lib.mongocrypt_ctx_explicit_decrypt_init(ctx,
binary.bin):
self._raise_from_status()
except Exception:
# Destroy the context on error.
self._close()
raise
def endpoint(self):
"""The kms hostname to connect over TLS."""
p = ffi.new("char *[]", 1)
try:
if not lib.mongocrypt_kms_ctx_endpoint(self.__ctx, p):
self.__raise_from_status()
return _to_string(p[0])
finally:
ffi.release(p)
- `opts`: An optional class:`DataKeyOpts`.
"""
super(DataKeyContext, self).__init__(ctx)
try:
if kms_provider == 'aws':
if opts is None or opts.master_key is None:
raise ValueError(
'master_key is required for kms_provider: "aws"')
if ('region' not in opts.master_key or
'key' not in opts.master_key):
raise ValueError(
'master_key must include "region" and "key" for '
'kms_provider: "aws"')
region = str_to_bytes(opts.master_key['region'])
key = str_to_bytes(opts.master_key['key'])
if not lib.mongocrypt_ctx_setopt_masterkey_aws(
ctx, region, len(region), key, len(key)):
self._raise_from_status()
if 'endpoint' in opts.master_key:
endpoint = str_to_bytes(opts.master_key['endpoint'])
if not lib.mongocrypt_ctx_setopt_masterkey_aws_endpoint(
ctx, endpoint, len(endpoint)):
self._raise_from_status()
elif kms_provider == 'local':
if not lib.mongocrypt_ctx_setopt_masterkey_local(ctx):
self._raise_from_status()
else:
raise ValueError('unknown kms_provider: %s' % (kms_provider,))
if opts.key_alt_names:
for key_alt_name in opts.key_alt_names:
with MongoCryptBinaryIn(key_alt_name) as binary:
def kms_contexts(self):
"""Yields the MongoCryptKmsContexts."""
ctx = lib.mongocrypt_ctx_next_kms_ctx(self.__ctx)
while ctx != ffi.NULL:
yield MongoCryptKmsContext(ctx)
ctx = lib.mongocrypt_ctx_next_kms_ctx(self.__ctx)
def state(self):
"""The current state of the mongocrypt_ctx_t."""
return lib.mongocrypt_ctx_state(self.__ctx)