Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_wrong_pubkeys(self):
# FIXME: generating keys for each test are expensive processes, maybe we
# should have an asset/fixture folder/loader?
rsa_key_one = securesystemslib.keys.generate_rsa_key()
rsa_key_two = securesystemslib.keys.generate_rsa_key()
self.step.pubkeys = ['bad-keyid']
with self.assertRaises(securesystemslib.exceptions.FormatError):
self.step._validate_pubkeys()
with self.assertRaises(securesystemslib.exceptions.FormatError):
self.step.validate()
self.step.pubkeys = [rsa_key_one['keyid'], rsa_key_two['keyid']]
self.step._validate_pubkeys()
self.step.validate()
def test_generate_rsa_signature(self):
signable = {'signed' : 'test', 'signatures' : []}
signed = securesystemslib.formats.encode_canonical(signable['signed']).encode('utf-8')
signable['signatures'].append(securesystemslib.keys.create_signature(
KEYS[0], signed))
self.assertEqual(1, len(signable['signatures']))
signature = signable['signatures'][0]
self.assertEqual(KEYS[0]['keyid'], signature['keyid'])
returned_signature = tuf.sig.generate_rsa_signature(signable['signed'], KEYS[0])
self.assertTrue(securesystemslib.formats.SIGNATURE_SCHEMA.matches(returned_signature))
signable['signatures'].append(securesystemslib.keys.create_signature(
KEYS[1], signed))
self.assertEqual(2, len(signable['signatures']))
signature = signable['signatures'][1]
self.assertEqual(KEYS[1]['keyid'], signature['keyid'])
def test_wrong_key_dictionary(self):
"""Test that the keys dictionary is properly populated."""
rsa_key_one = securesystemslib.keys.generate_rsa_key()
rsa_key_two = securesystemslib.keys.generate_rsa_key()
# FIXME: attr.ib reutilizes the default dictionary, so future constructor
# are not empty...
self.layout.keys = {"kek": rsa_key_one}
with self.assertRaises(securesystemslib.exceptions.FormatError):
self.layout._validate_keys()
with self.assertRaises(securesystemslib.exceptions.FormatError):
self.layout.validate()
self.layout.keys = {}
self.layout.keys[rsa_key_two['keyid']] = "kek"
with self.assertRaises(securesystemslib.exceptions.FormatError):
self.layout._validate_keys()
def test_keys(self):
with self.assertRaises(
securesystemslib.exceptions.UnsupportedLibraryError):
securesystemslib.keys.generate_rsa_key()
with self.assertRaises(
securesystemslib.exceptions.UnsupportedLibraryError):
securesystemslib.keys.generate_ecdsa_key()
with self.assertRaises(
securesystemslib.exceptions.UnsupportedLibraryError):
securesystemslib.keys.generate_ed25519_key()
data = 'foo'
keydict = {'keytype': 'ed25519',
'scheme': 'ed25519',
'keyid': 'f00',
'keyval': {'private': 'f001',
'public': 'b00f'}}
with self.assertRaises(
# key database.
for junk, key_metadata in six.iteritems(root_metadata['keys']):
if key_metadata['keytype'] in _SUPPORTED_KEY_TYPES:
# 'key_metadata' is stored in 'KEY_SCHEMA' format. Call
# create_from_metadata_format() to get the key in 'RSAKEY_SCHEMA' format,
# which is the format expected by 'add_key()'. Note: The 'keyids'
# returned by format_metadata_to_key() include keyids in addition to the
# default keyid listed in 'key_dict'. The additional keyids are
# generated according to securesystemslib.settings.HASH_ALGORITHMS.
# The repo may have used hashing algorithms for the generated keyids that
# doesn't match the client's set of hash algorithms. Make sure to only
# used the repo's selected hashing algorithms.
hash_algorithms = securesystemslib.settings.HASH_ALGORITHMS
securesystemslib.settings.HASH_ALGORITHMS = key_metadata['keyid_hash_algorithms']
key_dict, keyids = securesystemslib.keys.format_metadata_to_key(key_metadata)
securesystemslib.settings.HASH_ALGORITHMS = hash_algorithms
try:
for keyid in keyids:
# Make sure to update key_dict['keyid'] to use one of the other valid
# keyids, otherwise add_key() will have no reference to it.
key_dict['keyid'] = keyid
add_key(key_dict, keyid=None, repository_name=repository_name)
# Although keyid duplicates should *not* occur (unique dict keys), log a
# warning and continue. Howerver, 'key_dict' may have already been
# adding to the keydb elsewhere.
except securesystemslib.exceptions.KeyAlreadyExistsError as e: # pragma: no cover
logger.warning(e)
continue
logger.debug('Adding roles delegated from ' + repr(parent_role) + '.')
# Iterate the keys of the delegated roles of 'parent_role' and load them.
for keyid, keyinfo in six.iteritems(keys_info):
if keyinfo['keytype'] in ['rsa', 'ed25519', 'ecdsa-sha2-nistp256']:
# We specify the keyid to ensure that it's the correct keyid
# for the key.
try:
# The repo may have used hashing algorithms for the generated keyids
# that doesn't match the client's set of hash algorithms. Make sure
# to only used the repo's selected hashing algorithms.
hash_algorithms = securesystemslib.settings.HASH_ALGORITHMS
securesystemslib.settings.HASH_ALGORITHMS = keyinfo['keyid_hash_algorithms']
key, keyids = securesystemslib.keys.format_metadata_to_key(keyinfo)
securesystemslib.settings.HASH_ALGORITHMS = hash_algorithms
for key_id in keyids:
key['keyid'] = key_id
tuf.keydb.add_key(key, keyid=None, repository_name=self.repository_name)
except tuf.exceptions.KeyAlreadyExistsError:
pass
except (securesystemslib.exceptions.FormatError, securesystemslib.exceptions.Error):
logger.exception('Invalid key for keyid: ' + repr(keyid) + '.')
logger.error('Aborting role delegation for parent role ' + parent_role + '.')
raise
else:
logger.warning('Invalid key type for ' + repr(keyid) + '.')
securesystemslib.exceptions.FormatError, if the arguments are
improperly formatted
'filepath' is read and its contents extracted
An RSA key object conformant to 'tuf.formats.RSAKEY_SCHEMA'
"""
securesystemslib.formats.PATH_SCHEMA.check_match(filepath)
with open(filepath, "rb") as fo_pem:
rsa_pem = fo_pem.read().decode("utf-8")
if securesystemslib.keys.is_pem_private(rsa_pem):
rsa_key = securesystemslib.keys.import_rsakey_from_private_pem(
rsa_pem, password=password)
elif securesystemslib.keys.is_pem_public(rsa_pem):
rsa_key = securesystemslib.keys.import_rsakey_from_public_pem(rsa_pem)
else:
raise securesystemslib.exceptions.FormatError(
"The key has to be clear either a private or"
" public RSA key in PEM format")
return rsa_key
# Iterate the signatures and enumerate the signature_status fields.
# (i.e., good_sigs, bad_sigs, etc.).
for signature in signatures:
keyid = signature['keyid']
# Does the signature use an unrecognized key?
try:
key = tuf.keydb.get_key(keyid, repository_name)
except tuf.exceptions.UnknownKeyError:
unknown_sigs.append(keyid)
continue
# Does the signature use an unknown/unsupported signing scheme?
try:
valid_sig = securesystemslib.keys.verify_signature(key, signature, signed)
except securesystemslib.exceptions.UnsupportedAlgorithmError:
unknown_signing_schemes.append(keyid)
continue
# We are now dealing with either a trusted or untrusted key...
if valid_sig:
if role is not None:
# Is this an unauthorized key? (a keyid associated with 'role')
# Note that if the role is not known, tuf.exceptions.UnknownRoleError
# is raised here.
if keyids is None:
keyids = tuf.roledb.get_role_keyids(role, repository_name)
if keyid not in keyids:
def generate_rsa_key(bits=DEFAULT_RSA_KEY_BITS, scheme='rsassa-pss-sha256'):
return securesystemslib.keys.generate_rsa_key(bits, scheme)
Note: We actually pass the dictionary representation of the data to be
signed and `securesystemslib.keys.create_signature` converts it to
canonical JSON utf-8 encoded bytes before creating the signature.
key:
A signing key in the format securesystemslib.formats.KEY_SCHEMA
The dictionary representation of the newly created signature.
"""
securesystemslib.formats.KEY_SCHEMA.check_match(key)
signature = securesystemslib.keys.create_signature(key,
self.signed.signable_dict)
self.signatures.append(signature)
return signature