Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init(self):
"""Internal init helper."""
kms_providers = self.__opts.kms_providers
if 'aws' in kms_providers:
access_key_id = str_to_bytes(kms_providers['aws']['accessKeyId'])
secret_access_key = str_to_bytes(
kms_providers['aws']['secretAccessKey'])
if not lib.mongocrypt_setopt_kms_provider_aws(
self.__crypt,
access_key_id, len(access_key_id),
secret_access_key, len(secret_access_key)):
self.__raise_from_status()
if 'local' in kms_providers:
key = kms_providers['local']['key']
with MongoCryptBinaryIn(key) as binary_key:
if not lib.mongocrypt_setopt_kms_provider_local(
self.__crypt, binary_key.bin):
self.__raise_from_status()
schema_map = self.__opts.schema_map
if schema_map is not None:
with MongoCryptBinaryIn(schema_map) as binary_schema_map:
def __init__(self, ctx, database, command):
"""Abstracts libmongocrypt's mongocrypt_ctx_t type.
:Parameters:
- `ctx`: A mongocrypt_ctx_t. This MongoCryptContext takes ownership
of the underlying mongocrypt_ctx_t.
- `database`: Optional, the name of the database.
- `command`: The BSON command to encrypt.
"""
super(EncryptionContext, self).__init__(ctx)
self.database = database
try:
with MongoCryptBinaryIn(command) as binary:
database = str_to_bytes(database)
if not lib.mongocrypt_ctx_encrypt_init(
ctx, database, len(database), binary.bin):
self._raise_from_status()
except Exception:
# Destroy the context on error.
self._close()
raise
def __init(self):
"""Internal init helper."""
kms_providers = self.__opts.kms_providers
if 'aws' in kms_providers:
access_key_id = str_to_bytes(kms_providers['aws']['accessKeyId'])
secret_access_key = str_to_bytes(
kms_providers['aws']['secretAccessKey'])
if not lib.mongocrypt_setopt_kms_provider_aws(
self.__crypt,
access_key_id, len(access_key_id),
secret_access_key, len(secret_access_key)):
self.__raise_from_status()
if 'local' in kms_providers:
key = kms_providers['local']['key']
with MongoCryptBinaryIn(key) as binary_key:
if not lib.mongocrypt_setopt_kms_provider_local(
self.__crypt, binary_key.bin):
self.__raise_from_status()
schema_map = self.__opts.schema_map
if schema_map is not None:
def __init__(self, ctx, value, opts):
"""Abstracts libmongocrypt's mongocrypt_ctx_t type.
:Parameters:
- `ctx`: A mongocrypt_ctx_t. This MongoCryptContext takes ownership
of the underlying mongocrypt_ctx_t.
- `value`: The encoded document to encrypt, which must be in the
form { "v" : BSON value to encrypt }}.
- `opts`: A :class:`ExplicitEncryptOpts`.
"""
super(ExplicitEncryptionContext, self).__init__(ctx)
try:
algorithm = str_to_bytes(opts.algorithm)
if not lib.mongocrypt_ctx_setopt_algorithm(ctx, algorithm, -1):
self._raise_from_status()
if opts.key_id is not None:
with MongoCryptBinaryIn(opts.key_id) as binary:
if not lib.mongocrypt_ctx_setopt_key_id(ctx, binary.bin):
self._raise_from_status()
if opts.key_alt_name is not None:
with MongoCryptBinaryIn(opts.key_alt_name) as binary:
if not lib.mongocrypt_ctx_setopt_key_alt_name(ctx,
binary.bin):
self._raise_from_status()
with MongoCryptBinaryIn(value) as binary:
if not lib.mongocrypt_ctx_explicit_encrypt_init(ctx,
of the underlying mongocrypt_ctx_t.
- `kms_provider`: The KMS provider.
- `opts`: An optional class:`DataKeyOpts`.
"""
super(DataKeyContext, self).__init__(ctx)
try:
if kms_provider == 'aws':
if opts is None or opts.master_key is None:
raise ValueError(
'master_key is required for kms_provider: "aws"')
if ('region' not in opts.master_key or
'key' not in opts.master_key):
raise ValueError(
'master_key must include "region" and "key" for '
'kms_provider: "aws"')
region = str_to_bytes(opts.master_key['region'])
key = str_to_bytes(opts.master_key['key'])
if not lib.mongocrypt_ctx_setopt_masterkey_aws(
ctx, region, len(region), key, len(key)):
self._raise_from_status()
if 'endpoint' in opts.master_key:
endpoint = str_to_bytes(opts.master_key['endpoint'])
if not lib.mongocrypt_ctx_setopt_masterkey_aws_endpoint(
ctx, endpoint, len(endpoint)):
self._raise_from_status()
elif kms_provider == 'local':
if not lib.mongocrypt_ctx_setopt_masterkey_local(ctx):
self._raise_from_status()
else:
raise ValueError('unknown kms_provider: %s' % (kms_provider,))
if opts.key_alt_names:
- `kms_provider`: The KMS provider.
- `opts`: An optional class:`DataKeyOpts`.
"""
super(DataKeyContext, self).__init__(ctx)
try:
if kms_provider == 'aws':
if opts is None or opts.master_key is None:
raise ValueError(
'master_key is required for kms_provider: "aws"')
if ('region' not in opts.master_key or
'key' not in opts.master_key):
raise ValueError(
'master_key must include "region" and "key" for '
'kms_provider: "aws"')
region = str_to_bytes(opts.master_key['region'])
key = str_to_bytes(opts.master_key['key'])
if not lib.mongocrypt_ctx_setopt_masterkey_aws(
ctx, region, len(region), key, len(key)):
self._raise_from_status()
if 'endpoint' in opts.master_key:
endpoint = str_to_bytes(opts.master_key['endpoint'])
if not lib.mongocrypt_ctx_setopt_masterkey_aws_endpoint(
ctx, endpoint, len(endpoint)):
self._raise_from_status()
elif kms_provider == 'local':
if not lib.mongocrypt_ctx_setopt_masterkey_local(ctx):
self._raise_from_status()
else:
raise ValueError('unknown kms_provider: %s' % (kms_provider,))
if opts.key_alt_names:
for key_alt_name in opts.key_alt_names:
if kms_provider == 'aws':
if opts is None or opts.master_key is None:
raise ValueError(
'master_key is required for kms_provider: "aws"')
if ('region' not in opts.master_key or
'key' not in opts.master_key):
raise ValueError(
'master_key must include "region" and "key" for '
'kms_provider: "aws"')
region = str_to_bytes(opts.master_key['region'])
key = str_to_bytes(opts.master_key['key'])
if not lib.mongocrypt_ctx_setopt_masterkey_aws(
ctx, region, len(region), key, len(key)):
self._raise_from_status()
if 'endpoint' in opts.master_key:
endpoint = str_to_bytes(opts.master_key['endpoint'])
if not lib.mongocrypt_ctx_setopt_masterkey_aws_endpoint(
ctx, endpoint, len(endpoint)):
self._raise_from_status()
elif kms_provider == 'local':
if not lib.mongocrypt_ctx_setopt_masterkey_local(ctx):
self._raise_from_status()
else:
raise ValueError('unknown kms_provider: %s' % (kms_provider,))
if opts.key_alt_names:
for key_alt_name in opts.key_alt_names:
with MongoCryptBinaryIn(key_alt_name) as binary:
if not lib.mongocrypt_ctx_setopt_key_alt_name(
ctx, binary.bin):
self._raise_from_status()