Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def decrypt_deterministically(self, ciphertext: bytes,
associated_data: bytes) -> bytes:
if len(ciphertext) > crypto_format.NON_RAW_PREFIX_SIZE:
prefix = ciphertext[:crypto_format.NON_RAW_PREFIX_SIZE]
ciphertext_no_prefix = ciphertext[crypto_format.NON_RAW_PREFIX_SIZE:]
for entry in self._primitive_set.primitive_from_identifier(prefix):
try:
return entry.primitive.decrypt_deterministically(ciphertext_no_prefix,
associated_data)
except tink_error.TinkError as e:
logging.info(
'ciphertext prefix matches a key, but cannot decrypt: %s', e)
# Let's try all RAW keys.
for entry in self._primitive_set.raw_primitives():
try:
return entry.primitive.decrypt_deterministically(ciphertext,
associated_data)
except tink_error.TinkError as e:
pass
# nothing works.
raise tink_error.TinkError('Decryption failed.')
def _validate_key(key: tink_pb2.Keyset.Key):
"""Raises tink_error.TinkError if key is not valid."""
if not key.HasField('key_data'):
raise tink_error.TinkError('key {} has no key data'.format(key.key_id))
if key.output_prefix_type == tink_pb2.UNKNOWN_PREFIX:
raise tink_error.TinkError('key {} has unknown prefix'.format(key.key_id))
if key.status == tink_pb2.UNKNOWN_STATUS:
raise tink_error.TinkError('key {} has unknown status'.format(key.key_id))
new_key_allowed: If new_key_allowed is true, users can generate new keys
with this manager using Registry.new_key()
"""
key_managers = cls._key_managers
type_url = key_manager.key_type()
primitive_class = key_manager.primitive_class()
if not key_manager.does_support(type_url):
raise tink_error.TinkError(
'The manager does not support its own type {}.'.format(type_url))
if type_url in key_managers:
existing, existing_new_key = key_managers[type_url]
if (type(existing) != type(key_manager) or # pylint: disable=unidiomatic-typecheck
existing.primitive_class() != primitive_class):
raise tink_error.TinkError(
'A manager for type {} has been already registered.'.format(
type_url))
else:
if not existing_new_key and new_key_allowed:
raise tink_error.TinkError(
('A manager for type {} has been already registered '
'with forbidden new key operation.').format(type_url))
key_managers[type_url] = (existing, new_key_allowed)
else:
key_managers[type_url] = (key_manager, new_key_allowed)
def decrypt(self, ciphertext: bytes, context_info: bytes) -> bytes:
if len(ciphertext) > crypto_format.NON_RAW_PREFIX_SIZE:
prefix = ciphertext[:crypto_format.NON_RAW_PREFIX_SIZE]
ciphertext_no_prefix = ciphertext[crypto_format.NON_RAW_PREFIX_SIZE:]
for entry in self._primitive_set.primitive_from_identifier(prefix):
try:
return entry.primitive.decrypt(ciphertext_no_prefix,
context_info)
except tink_error.TinkError as e:
logging.info(
'ciphertext prefix matches a key, but cannot decrypt: %s', e)
# Let's try all RAW keys.
for entry in self._primitive_set.raw_primitives():
try:
return entry.primitive.decrypt(ciphertext, context_info)
except tink_error.TinkError as e:
pass
# nothing works.
raise tink_error.TinkError('Decryption failed.')
ciphertext_no_prefix = ciphertext[crypto_format.NON_RAW_PREFIX_SIZE:]
for entry in self._primitive_set.primitive_from_identifier(prefix):
try:
return entry.primitive.decrypt(ciphertext_no_prefix,
context_info)
except tink_error.TinkError as e:
logging.info(
'ciphertext prefix matches a key, but cannot decrypt: %s', e)
# Let's try all RAW keys.
for entry in self._primitive_set.raw_primitives():
try:
return entry.primitive.decrypt(ciphertext, context_info)
except tink_error.TinkError as e:
pass
# nothing works.
raise tink_error.TinkError('Decryption failed.')
def write_encrypted(self, encrypted_keyset: tink_pb2.EncryptedKeyset) -> None:
if not isinstance(encrypted_keyset, tink_pb2.EncryptedKeyset):
raise tink_error.TinkError('invalid encrypted keyset.')
json_keyset = json_format.MessageToJson(encrypted_keyset)
# TODO(b/141106504) Needed for python 2.7 compatibility. StringIO expects
# unicode, but MessageToJson outputs UTF-8.
if isinstance(json_keyset, bytes):
json_keyset = json_keyset.decode('utf-8')
self._io_stream.write(json_keyset)
self._io_stream.flush()