Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_mark_dirty(self):
# Add a dirty role to roledb.
rolename = 'targets'
roleinfo1 = {'keyids': ['123'], 'threshold': 1}
tuf.roledb.add_role(rolename, roleinfo1)
rolename2 = 'dirty_role'
roleinfo2 = {'keyids': ['123'], 'threshold': 2}
mark_role_as_dirty = True
tuf.roledb.update_roleinfo(rolename, roleinfo1, mark_role_as_dirty)
# Note: The 'default' repository is searched if the repository name is
# not given to get_dirty_roles().
self.assertEqual([rolename], tuf.roledb.get_dirty_roles())
tuf.roledb.mark_dirty(['dirty_role'])
self.assertEqual([rolename2, rolename], tuf.roledb.get_dirty_roles())
# Verify that a role cannot be marked as dirty for a non-existent
# repository.
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.mark_dirty,
['dirty_role'], 'non-existent')
# role does and 'targets/role1' do not contain a threshold of keys.
root_roleinfo = tuf.roledb.get_roleinfo('root')
old_threshold = root_roleinfo['threshold']
root_roleinfo['threshold'] = 10
role1_roleinfo = tuf.roledb.get_roleinfo('targets/role1')
old_role1_threshold = role1_roleinfo['threshold']
role1_roleinfo['threshold'] = 10
tuf.roledb.update_roleinfo('root', root_roleinfo)
tuf.roledb.update_roleinfo('targets/role1', role1_roleinfo)
repository.status()
# Restore the original threshold values.
root_roleinfo['threshold'] = old_threshold
tuf.roledb.update_roleinfo('root', root_roleinfo)
role1_roleinfo['threshold'] = old_role1_threshold
tuf.roledb.update_roleinfo('targets/role1', role1_roleinfo)
# Verify status() does not raise 'tuf.UnsignedMetadataError' if any of the
# the top-level roles and 'targets/role1' are improperly signed.
repository.root.unload_signing_key(root_privkey)
repository.root.load_signing_key(targets_privkey)
repository.targets('role1').unload_signing_key(role1_privkey)
repository.targets('role1').load_signing_key(targets_privkey)
repository.status()
# Reset Root and 'targets/role1', and verify Targets.
repository.root.unload_signing_key(targets_privkey)
repository.root.load_signing_key(root_privkey)
repository.targets('role1').unload_signing_key(targets_privkey)
repository.targets('role1').load_signing_key(role1_privkey)
repository.targets.unload_signing_key(targets_privkey)
def test_update_roleinfo(self):
rolename = 'targets'
roleinfo = {'keyids': ['123'], 'threshold': 1}
tuf.roledb.add_role(rolename, roleinfo)
# Test normal case.
tuf.roledb.update_roleinfo(rolename, roleinfo)
# Verify that a roleinfo can be updated for a role in a non-default
# repository.
repository_name = 'example_repository'
mark_role_as_dirty = True
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.clear_roledb, repository_name)
tuf.roledb.create_roledb(repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name)
tuf.roledb.update_roleinfo(rolename, roleinfo, mark_role_as_dirty, repository_name)
self.assertEqual(roleinfo['keyids'], tuf.roledb.get_role_keyids(rolename, repository_name))
# Reset the roledb so that subsequent tests can access the default roledb.
tuf.roledb.remove_roledb(repository_name)
# Test for an unknown role.
self.assertRaises(tuf.exceptions.UnknownRoleError, tuf.roledb.update_roleinfo,
'unknown_rolename', roleinfo)
# Verify that a roleinfo cannot be updated to a non-existent repository
# name.
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.update_roleinfo,
'new_rolename', roleinfo, False, 'non-existent')
# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.update_roleinfo, 1, roleinfo)
filepath = os.path.abspath(filepath)
targets_directory_length = len(self._targets_directory)
# Ensure 'filepath' is under the repository targets directory.
if not filepath.startswith(self._targets_directory+os.sep):
raise tuf.Error(repr(filepath) + ' is not under the Repository\'s'
' targets directory: ' + repr(self._targets_directory))
# The relative filepath is listed in 'paths'.
relative_filepath = filepath[targets_directory_length:]
# Remove 'relative_filepath', if found, and update this Targets roleinfo.
fileinfo = tuf.roledb.get_roleinfo(self.rolename)
if relative_filepath in fileinfo['paths']:
del fileinfo['paths'][relative_filepath]
tuf.roledb.update_roleinfo(self.rolename, fileinfo)
else:
raise tuf.Error('Target file path not found.')
roleinfo = tuf.roledb.get_roleinfo(
parsed_arguments.role, repository_name=repository._repository_name)
# It is assumed we have a delegated role, and that the caller has made
# sure to reject top-level roles specified with --role.
if target_path not in roleinfo['paths']:
logger.debug('Adding new target: ' + repr(target_path))
roleinfo['paths'].update({target_path: custom})
else:
logger.debug('Replacing target: ' + repr(target_path))
roleinfo['paths'].update({target_path: custom})
tuf.roledb.update_roleinfo(parsed_arguments.role, roleinfo,
mark_role_as_dirty=True, repository_name=repository._repository_name)
for filepath, fileinfo in six.iteritems(targets_metadata['targets']):
roleinfo['paths'].update({filepath: fileinfo.get('custom', {})})
roleinfo['version'] = targets_metadata['version']
roleinfo['expires'] = targets_metadata['expires']
roleinfo['delegations'] = targets_metadata['delegations']
if _metadata_is_partially_loaded('targets', signable, repository_name):
roleinfo['partial_loaded'] = True
else:
logger.debug('Targets file was not partially loaded.')
_log_warning_if_expires_soon(TARGETS_FILENAME, roleinfo['expires'],
TARGETS_EXPIRES_WARN_SECONDS)
tuf.roledb.update_roleinfo('targets', roleinfo, mark_role_as_dirty=False,
repository_name=repository_name)
# Add the keys specified in the delegations field of the Targets role.
for key_metadata in six.itervalues(targets_metadata['delegations']['keys']):
# The repo may have used hashing algorithms for the generated keyids
# that doesn't match the client's set of hash algorithms. Make sure
# to only used the repo's selected hashing algorithms.
hash_algorithms = securesystemslib.settings.HASH_ALGORITHMS
securesystemslib.settings.HASH_ALGORITHMS = key_metadata['keyid_hash_algorithms']
key_object, keyids = securesystemslib.keys.format_metadata_to_key(key_metadata)
securesystemslib.settings.HASH_ALGORITHMS = hash_algorithms
# Add 'key_object' to the list of recognized keys. Keys may be shared,
# so do not raise an exception if 'key_object' has already been loaded.
# In contrast to the methods that may add duplicate keys, do not log
None.
"""
# Does 'threshold' have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named. Raise
# 'securesystemslib.exceptions.FormatError' if any are improperly formatted.
securesystemslib.formats.THRESHOLD_SCHEMA.check_match(threshold)
roleinfo = tuf.roledb.get_roleinfo(self._rolename, self._repository_name)
roleinfo['previous_threshold'] = roleinfo['threshold']
roleinfo['threshold'] = threshold
tuf.roledb.update_roleinfo(self._rolename, roleinfo,
repository_name=self._repository_name)
_remove_invalid_and_duplicate_signatures(signable, repository_name)
# Root should always be written as if consistent_snapshot is True (i.e.,
# write .root.json and root.json to disk).
if rolename == 'root':
consistent_snapshot = True
filename = write_metadata_file(signable, metadata_filename,
metadata['version'], consistent_snapshot)
# 'signable' contains an invalid threshold of signatures.
else:
# Since new metadata cannot be successfully written, restore the current
# version number.
roleinfo = tuf.roledb.get_roleinfo(rolename, repository_name)
roleinfo['version'] = current_version
tuf.roledb.update_roleinfo(rolename, roleinfo,
repository_name=repository_name)
# Note that 'signable' is an argument to tuf.UnsignedMetadataError().
raise tuf.exceptions.UnsignedMetadataError('Not enough'
' signatures for ' + repr(metadata_filename), signable)
# 'rolename' is a delegated role or a top-level role that is partially
# signed, and thus its signatures should not be verified.
else:
signable = sign_metadata(metadata, signing_keyids, metadata_filename,
repository_name)
_remove_invalid_and_duplicate_signatures(signable, repository_name)
# Root should always be written as if consistent_snapshot is True (i.e.,
# .root.json and root.json).
if rolename == 'root':
# Before writing 'rolename' to disk, automatically increment its version
# number (if 'increment_version_number' is True) so that the caller does not
# have to manually perform this action. The version number should be
# incremented in both the metadata file and roledb (required so that Snapshot
# references the latest version).
# Store the 'current_version' in case the version number must be restored
# (e.g., if 'rolename' cannot be written to disk because its metadata is not
# properly signed).
current_version = metadata['version']
if increment_version_number:
roleinfo = tuf.roledb.get_roleinfo(rolename, repository_name)
metadata['version'] = metadata['version'] + 1
roleinfo['version'] = roleinfo['version'] + 1
tuf.roledb.update_roleinfo(rolename, roleinfo,
repository_name=repository_name)
else:
logger.debug('Not incrementing ' + repr(rolename) + '\'s version number.')
if rolename in ['root', 'targets', 'snapshot', 'timestamp'] and not allow_partially_signed:
# Verify that the top-level 'rolename' is fully signed. Only a delegated
# role should not be written to disk without full verification of its
# signature(s), since it can only be considered fully signed depending on
# the delegating role.
signable = sign_metadata(metadata, signing_keyids, metadata_filename,
repository_name)
def should_write():
# Root must be signed by its previous keys and threshold.
# Ensure 'key', which should contain the public portion, is added to
# 'tuf.keydb.py'.
try:
tuf.keydb.add_key(key)
except tuf.KeyAlreadyExistsError, e:
pass
keyid = key['keyid']
roleinfo = tuf.roledb.get_roleinfo(self.rolename)
# Add 'key' to the role's entry in 'tuf.roledb.py' and avoid duplicates.
if keyid not in roleinfo['keyids']:
roleinfo['keyids'].append(keyid)
tuf.roledb.update_roleinfo(self._rolename, roleinfo)