Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def read_keyset(keyset_filename):
"""Load a keyset from a file.
Args:
keyset_filename: A path to a keyset file
Returns:
A KeysetHandle of the file's keyset
Raises:
TinkError: if the file is not valid
IOError: if the file does not exist
"""
with open(keyset_filename, 'rb') as keyset_file:
text = keyset_file.read()
keyset = cleartext_keyset_handle.CleartextKeysetHandle(
tink.BinaryKeysetReader(text).read())
return keyset
def verify(self, signature: bytes, data: bytes):
"""Verifies that signature is a digital signature for data.
Args:
signature: The signature bytes to be checked.
data: The data bytes to be checked.
Raises:
tink_error.TinkError if the verification fails.
"""
if len(signature) <= crypto_format.NON_RAW_PREFIX_SIZE:
# This also rejects raw signatures with size of 4 bytes or fewer.
# We're not aware of any schemes that output signatures that small.
raise tink_error.TinkError('signature too short')
key_id = signature[:crypto_format.NON_RAW_PREFIX_SIZE]
raw_sig = signature[crypto_format.NON_RAW_PREFIX_SIZE:]
for entry in self._primitive_set.primitive_from_identifier(key_id):
try:
if entry.output_prefix_type == tink_pb2.LEGACY:
entry.primitive.verify(raw_sig,
data + crypto_format.LEGACY_START_BYTE)
else:
entry.primitive.verify(raw_sig, data)
# Signature is valid, we can return
return
except tink_error.TinkError as err:
logging.info('signature prefix matches a key, but cannot verify: %s',
err)
def _decrypt(encrypted_keyset: tink_pb2.EncryptedKeyset,
master_key_aead: aead.Aead) -> tink_pb2.Keyset:
"""Decrypts an EncryptedKeyset and returns a Keyset."""
try:
keyset = tink_pb2.Keyset.FromString(
master_key_aead.decrypt(encrypted_keyset.encrypted_keyset, b''))
# Check emptiness here too, in case the encrypted keys unwrapped to nothing?
_assert_enough_key_material(keyset)
return keyset
except message.DecodeError:
raise tink_error.TinkError('invalid keyset, corrupted key material')
"""Raises tink_error.TinkError if keyset is not valid."""
for key in keyset.key:
if key.status != tink_pb2.DESTROYED:
_validate_key(key)
num_non_destroyed_keys = sum(
1 for key in keyset.key if key.status != tink_pb2.DESTROYED)
num_non_public_key_material = sum(
1 for key in keyset.key
if key.key_data.key_material_type != tink_pb2.KeyData.ASYMMETRIC_PUBLIC)
num_primary_keys = sum(
1 for key in keyset.key
if key.status == tink_pb2.ENABLED and key.key_id == keyset.primary_key_id)
if num_non_destroyed_keys == 0:
raise tink_error.TinkError('empty keyset')
if num_primary_keys > 1:
raise tink_error.TinkError('keyset contains multiple primary keys')
if num_primary_keys == 0 and num_non_public_key_material > 0:
raise tink_error.TinkError('keyset does not contain a valid primary key')
def _validate_key(key: tink_pb2.Keyset.Key):
"""Raises tink_error.TinkError if key is not valid."""
if not key.HasField('key_data'):
raise tink_error.TinkError('key {} has no key data'.format(key.key_id))
if key.output_prefix_type == tink_pb2.UNKNOWN_PREFIX:
raise tink_error.TinkError('key {} has unknown prefix'.format(key.key_id))
if key.status == tink_pb2.UNKNOWN_STATUS:
raise tink_error.TinkError('key {} has unknown status'.format(key.key_id))
def decrypt_deterministically(self, ciphertext: bytes,
associated_data: bytes) -> bytes:
if len(ciphertext) > crypto_format.NON_RAW_PREFIX_SIZE:
prefix = ciphertext[:crypto_format.NON_RAW_PREFIX_SIZE]
ciphertext_no_prefix = ciphertext[crypto_format.NON_RAW_PREFIX_SIZE:]
for entry in self._primitive_set.primitive_from_identifier(prefix):
try:
return entry.primitive.decrypt_deterministically(ciphertext_no_prefix,
associated_data)
except tink_error.TinkError as e:
logging.info(
'ciphertext prefix matches a key, but cannot decrypt: %s', e)
# Let's try all RAW keys.
for entry in self._primitive_set.raw_primitives():
try:
return entry.primitive.decrypt_deterministically(ciphertext,
associated_data)
except tink_error.TinkError as e:
pass
# nothing works.
raise tink_error.TinkError('Decryption failed.')
def _validate_key(key: tink_pb2.Keyset.Key):
"""Raises tink_error.TinkError if key is not valid."""
if not key.HasField('key_data'):
raise tink_error.TinkError('key {} has no key data'.format(key.key_id))
if key.output_prefix_type == tink_pb2.UNKNOWN_PREFIX:
raise tink_error.TinkError('key {} has unknown prefix'.format(key.key_id))
if key.status == tink_pb2.UNKNOWN_STATUS:
raise tink_error.TinkError('key {} has unknown status'.format(key.key_id))
def _assert_enough_key_material(keyset: tink_pb2.Keyset):
if not keyset or not keyset.key:
raise tink_error.TinkError('empty keyset')
def write(self, keyset: tink_pb2.Keyset) -> None:
if not isinstance(keyset, tink_pb2.Keyset):
raise tink_error.TinkError('invalid keyset.')
self._io_stream.write(keyset.SerializeToString())
self._io_stream.flush()
Args:
b: The buffer to write.
"""
self._check_not_closed()
if not isinstance(b, (bytes, memoryview, bytearray)):
raise TypeError('a bytes-like object is required, not {}'.format(
type(b).__name__))
# One call to OutputStreamAdapter.write() may call next() multiple times
# on the C++ EncryptingStream, but will perform a partial write if there is
# a temporary write error. Permanent write errors will bubble up as
# exceptions.
written = self._output_stream_adapter.write(b)
if written < 0:
raise tink_error.TinkError('Number of written bytes was negative')
self._bytes_written += written
if written < len(b):
raise io.BlockingIOError(errno.EAGAIN,
'Write could not complete without blocking.',
written)
elif written > len(b):
raise tink_error.TinkError(
'Number of written bytes was greater than length of bytes given')
return written