Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
except tuf.KeyAlreadyExistsError:
pass
# Add the delegated role's initial roleinfo, to be fully populated
# when its metadata file is next loaded in the os.walk() iteration.
for role in metadata_object['delegations']['roles']:
rolename = role['name']
roleinfo = {'name': role['name'], 'keyids': role['keyids'],
'threshold': role['threshold'],
'compressions': [''], 'signing_keyids': [],
'signatures': [],
'paths': {},
'partial_loaded': False,
'delegations': {'keys': {},
'roles': []}}
tuf.roledb.add_role(rolename, roleinfo)
return repository
def test_verify_single_key(self):
signable = {'signed' : 'test', 'signatures' : []}
signed = securesystemslib.formats.encode_canonical(signable['signed']).encode('utf-8')
signable['signatures'].append(securesystemslib.keys.create_signature(
KEYS[0], signed))
tuf.keydb.add_key(KEYS[0])
threshold = 1
roleinfo = tuf.formats.build_dict_conforming_to_schema(
tuf.formats.ROLE_SCHEMA, keyids=[KEYS[0]['keyid']], threshold=threshold)
tuf.roledb.add_role('Root', roleinfo)
# This will call verify() and return True if 'signable' is valid,
# False otherwise.
self.assertTrue(tuf.sig.verify(signable, 'Root'))
# Done. Let's remove the added key(s) from the key database.
tuf.keydb.remove_key(KEYS[0]['keyid'])
# Remove the roles.
tuf.roledb.remove_role('Root')
roleinfo2 = {'keyids': ['456', '789'], 'threshold': 2, 'paths': paths}
self.assertRaises(tuf.exceptions.UnknownRoleError, tuf.roledb.get_role_paths, rolename)
tuf.roledb.add_role(rolename, roleinfo)
tuf.roledb.add_role(rolename2, roleinfo2)
self.assertEqual({}, tuf.roledb.get_role_paths(rolename))
self.assertEqual(paths, tuf.roledb.get_role_paths(rolename2))
# Verify that role paths can be queried for roles in non-default
# repositories.
repository_name = 'example_repository'
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.get_role_paths,
rolename, repository_name)
tuf.roledb.create_roledb(repository_name)
tuf.roledb.add_role(rolename2, roleinfo2, repository_name)
self.assertEqual(roleinfo2['paths'], tuf.roledb.get_role_paths(rolename2,
repository_name))
# Reset the roledb so that subsequent roles have access to the original,
# default roledb.
tuf.roledb.remove_roledb(repository_name)
# Test conditions where the arguments are improperly formatted,
# contain invalid names, or haven't been added to the role database.
self._test_rolename(tuf.roledb.get_role_paths)
self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.get_role_paths, rolename, 123)
super(Timestamp, self).__init__()
self._rolename = 'timestamp'
# By default, 'snapshot' metadata is set to expire 1 week from the current
# time. The expiration may be modified.
expiration = \
tuf.formats.unix_timestamp_to_datetime(int(time.time() + TIMESTAMP_EXPIRATION))
expiration = expiration.isoformat() + 'Z'
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
'signatures': [], 'version': 0, 'compressions': [''],
'expires': expiration, 'partial_loaded': False}
try:
tuf.roledb.add_role(self.rolename, roleinfo)
except tuf.RoleAlreadyExistsError:
pass
def test_unmark_dirty(self):
# Add a dirty role to roledb.
rolename = 'targets'
roleinfo1 = {'keyids': ['123'], 'threshold': 1}
tuf.roledb.add_role(rolename, roleinfo1)
rolename2 = 'dirty_role'
roleinfo2 = {'keyids': ['123'], 'threshold': 2}
tuf.roledb.add_role(rolename2, roleinfo2)
mark_role_as_dirty = True
tuf.roledb.update_roleinfo(rolename, roleinfo1, mark_role_as_dirty)
# Note: The 'default' repository is searched if the repository name is
# not given to get_dirty_roles().
self.assertEqual([rolename], tuf.roledb.get_dirty_roles())
tuf.roledb.update_roleinfo(rolename2, roleinfo2, mark_role_as_dirty)
tuf.roledb.unmark_dirty(['dirty_role'])
self.assertEqual([rolename], tuf.roledb.get_dirty_roles())
tuf.roledb.unmark_dirty(['targets'])
self.assertEqual([], tuf.roledb.get_dirty_roles())
# What happens for a role that isn't dirty? unmark_dirty() should just
key_object = repo_tool.import_ed25519_publickey_from_file(key_path)
self.metadata.add_verification_key(key_object)
keyid = key_object['keyid']
self.assertEqual([keyid], self.metadata.keys)
expiration = \
tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400))
expiration = expiration.isoformat() + 'Z'
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
'signatures': [], 'version': 0,
'consistent_snapshot': False, 'expires': expiration,
'partial_loaded': False}
tuf.roledb.add_role('Root', roleinfo, 'test_repository')
tuf.roledb.add_role('Targets', roleinfo, 'test_repository')
tuf.roledb.add_role('Snapshot', roleinfo, 'test_repository')
tuf.roledb.add_role('Timestamp', roleinfo, 'test_repository')
# Test for different top-level role names.
self.metadata._rolename = 'Targets'
self.metadata.add_verification_key(key_object)
self.metadata._rolename = 'Snapshot'
self.metadata.add_verification_key(key_object)
self.metadata._rolename = 'Timestamp'
self.metadata.add_verification_key(key_object)
# Test for a given 'expires' argument.
expires = datetime.datetime(2030, 1, 1, 12, 0)
self.metadata.add_verification_key(key_object, expires)
def test_get_dirty_roles(self):
# Verify that the dirty roles of a role are returned.
rolename = 'targets'
roleinfo1 = {'keyids': ['123'], 'threshold': 1}
tuf.roledb.add_role(rolename, roleinfo1)
roleinfo2 = {'keyids': ['123'], 'threshold': 2}
mark_role_as_dirty = True
tuf.roledb.update_roleinfo(rolename, roleinfo2, mark_role_as_dirty)
# Note: The 'default' repository is searched if the repository name is
# not given to get_dirty_roles().
self.assertEqual([rolename], tuf.roledb.get_dirty_roles())
# Verify that a list of dirty roles is returned for a non-default
# repository.
repository_name = 'example_repository'
tuf.roledb.create_roledb(repository_name)
tuf.roledb.add_role(rolename, roleinfo1, repository_name)
tuf.roledb.update_roleinfo(rolename, roleinfo2, mark_role_as_dirty, repository_name)
self.assertEqual([rolename], tuf.roledb.get_dirty_roles(repository_name))
# Verify that dirty roles are not returned for a non-existent repository.
self.assertEqual(set(['release']),
set(tuf.roledb.get_delegated_rolenames(rolename2)))
self.assertEqual(set([]),
set(tuf.roledb.get_delegated_rolenames(rolename3)))
self.assertEqual(set([]),
set(tuf.roledb.get_delegated_rolenames(rolename4)))
# Verify that the delegated rolenames of a role in a non-default
# repository can be accessed.
repository_name = 'example_repository'
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.get_delegated_rolenames,
rolename, repository_name)
tuf.roledb.create_roledb(repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name)
self.assertEqual(set(['django', 'tuf']),
set(tuf.roledb.get_delegated_rolenames(rolename, repository_name)))
# Reset the roledb so that subsequent tests have access to the original,
# default roledb.
tuf.roledb.remove_roledb(repository_name)
# Test conditions where the arguments are improperly formatted,
# contain invalid names, or haven't been added to the role database.
self._test_rolename(tuf.roledb.get_delegated_rolenames)
self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.get_delegated_rolenames, rolename, 123)
logger.exception('Invalid key for keyid: ' + repr(keyid) + '.')
logger.error('Aborting role delegation for parent role ' + parent_role + '.')
raise
else:
logger.warning('Invalid key type for ' + repr(keyid) + '.')
continue
# Add the roles to the role database.
for roleinfo in roles_info:
try:
# NOTE: tuf.roledb.add_role will take care of the case where rolename
# is None.
rolename = roleinfo.get('name')
logger.debug('Adding delegated role: ' + str(rolename) + '.')
tuf.roledb.add_role(rolename, roleinfo, self.repository_name)
except tuf.exceptions.RoleAlreadyExistsError:
logger.warning('Role already exists: ' + rolename)
except Exception:
logger.exception('Failed to add delegated role: ' + repr(rolename) + '.')
raise
def __init__(self):
super(Timestamp, self).__init__()
self._rolename = 'timestamp'
# By default, 'release' metadata is set to expire 1 week from the current
# time. The expiration may be modified.
expiration = tuf.formats.format_time(time.time()+TIMESTAMP_EXPIRATION)
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
'signatures': [], 'version': 0, 'compressions': [''],
'expires': expiration, 'partial_loaded': False}
try:
tuf.roledb.add_role(self.rolename, roleinfo)
except tuf.RoleAlreadyExistsError, e:
pass