Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def import_publickey_from_file(keypath):
try:
key_metadata = securesystemslib.util.load_json_file(keypath)
# An RSA public key is saved to disk in PEM format (not JSON), so the
# load_json_file() call above can fail for this reason. Try to potentially
# load the PEM string in keypath if an exception is raised.
except securesystemslib.exceptions.Error:
key_metadata = securesystemslib.interface.import_rsa_publickey_from_file(
keypath)
key_object, junk = securesystemslib.keys.format_metadata_to_key(key_metadata)
if key_object['keytype'] not in SUPPORTED_KEY_TYPES:
raise tuf.exceptions.Error('Trying to import an unsupported key'
' type: ' + repr(key_object['keytype'] + '.'
' Supported key types: ' + repr(SUPPORTED_KEY_TYPES)))
else:
return key_object
def _update_from_repository(self, repository_name, target_filename):
updater = self.get_updater(repository_name)
if not updater:
raise tuf.exceptions.Error(
'Cannot load updater for ' + repr(repository_name))
else:
# Get one valid target info from the Updater object.
# 'tuf.exceptions.UnknownTargetError' raised by get_one_valid_targetinfo
# if a valid target cannot be found.
return updater.get_one_valid_targetinfo(target_filename), updater
"""Indicate that a key already exists and cannot be added."""
class RoleAlreadyExistsError(Error):
"""Indicate that a role already exists and cannot be added."""
class UnknownRoleError(Error):
"""Indicate an error trying to locate or identify a specified TUF role."""
class UnknownTargetError(Error):
"""Indicate an error trying to locate or identify a specified target."""
class InvalidNameError(Error):
"""Indicate an error while trying to validate any type of named object."""
class UnsignedMetadataError(Error):
"""Indicate metadata object with insufficient threshold of signatures."""
def __init__(self, message, signable):
super(UnsignedMetadataError, self).__init__()
self.exception_message = message
self.signable = signable
def __str__(self):
return self.exception_message
def __repr__(self):
def delegate(parsed_arguments):
if not parsed_arguments.delegatee:
raise tuf.exceptions.Error(
'--delegatee must be set to perform the delegation.')
if parsed_arguments.delegatee in ('root', 'snapshot', 'timestamp', 'targets'):
raise tuf.exceptions.Error(
'Cannot delegate to the top-level role: ' + repr(parsed_arguments.delegatee))
if not parsed_arguments.pubkeys:
raise tuf.exceptions.Error(
'--pubkeys must be set to perform the delegation.')
public_keys = []
for public_key in parsed_arguments.pubkeys:
imported_pubkey = import_publickey_from_file(public_key)
public_keys.append(imported_pubkey)
repository = repo_tool.load_repository(
os.path.join(parsed_arguments.path, REPO_DIR))
if parsed_arguments.role == 'targets':
repository.targets.delegate(parsed_arguments.delegatee, public_keys,
def remove_verification_key(parsed_arguments):
if not parsed_arguments.pubkeys:
raise tuf.exceptions.Error('--pubkeys must be given with --distrust.')
repository = repo_tool.load_repository(
os.path.join(parsed_arguments.path, REPO_DIR))
for keypath in parsed_arguments.pubkeys:
imported_pubkey = import_publickey_from_file(keypath)
try:
if parsed_arguments.role not in ('root', 'targets', 'snapshot', 'timestamp'):
raise tuf.exceptions.Error('The given --role is not a top-level role.')
elif parsed_arguments.role == 'root':
repository.root.remove_verification_key(imported_pubkey)
elif parsed_arguments.role == 'targets':
repository.targets.remove_verification_key(imported_pubkey)
elif parsed_arguments.role == 'snapshot':
repository.snapshot.remove_verification_key(imported_pubkey)
# The Timestamp key..
else:
repository.timestamp.remove_verification_key(imported_pubkey)
# It is assumed remove_verification_key() only raises
# securesystemslib.exceptions.Error and
def add_verification_key(parsed_arguments):
if not parsed_arguments.pubkeys:
raise tuf.exceptions.Error('--pubkeys must be given with --trust.')
repository = repo_tool.load_repository(
os.path.join(parsed_arguments.path, REPO_DIR))
for keypath in parsed_arguments.pubkeys:
imported_pubkey = import_publickey_from_file(keypath)
if parsed_arguments.role not in ('root', 'targets', 'snapshot', 'timestamp'):
raise tuf.exceptions.Error('The given --role is not a top-level role.')
elif parsed_arguments.role == 'root':
repository.root.add_verification_key(imported_pubkey)
elif parsed_arguments.role == 'targets':
repository.targets.add_verification_key(imported_pubkey)
password)
except securesystemslib.exceptions.CryptoError:
try:
logger.debug(
'Decryption failed. Attempting to import a private PEM instead.')
key_object = securesystemslib.keys.import_rsakey_from_private_pem(
encrypted_key, 'rsassa-pss-sha256', password)
except securesystemslib.exceptions.CryptoError:
raise tuf.exceptions.Error(repr(keypath) + ' cannot be imported, possibly'
' because an invalid key file is given or the decryption password is'
' incorrect.')
if key_object['keytype'] not in SUPPORTED_KEY_TYPES:
raise tuf.exceptions.Error('Trying to import an unsupported key'
' type: ' + repr(key_object['keytype'] + '.'
' Supported key types: ' + repr(SUPPORTED_KEY_TYPES)))
else:
# Add "keyid_hash_algorithms" so that equal keys with different keyids can
# be associated using supported keyid_hash_algorithms.
key_object['keyid_hash_algorithms'] = securesystemslib.settings.HASH_ALGORITHMS
return key_object
def add_verification_key(parsed_arguments):
if not parsed_arguments.pubkeys:
raise tuf.exceptions.Error('--pubkeys must be given with --trust.')
repository = repo_tool.load_repository(
os.path.join(parsed_arguments.path, REPO_DIR))
for keypath in parsed_arguments.pubkeys:
imported_pubkey = import_publickey_from_file(keypath)
if parsed_arguments.role not in ('root', 'targets', 'snapshot', 'timestamp'):
raise tuf.exceptions.Error('The given --role is not a top-level role.')
elif parsed_arguments.role == 'root':
repository.root.add_verification_key(imported_pubkey)
elif parsed_arguments.role == 'targets':
repository.targets.add_verification_key(imported_pubkey)
elif parsed_arguments.role == 'snapshot':
repository.snapshot.add_verification_key(imported_pubkey)
# The timestamp role..
else:
repository.timestamp.add_verification_key(imported_pubkey)
consistent_snapshot = tuf.roledb.get_roleinfo('root',
repository._repository_name)['consistent_snapshot']
parsed_arguments:
An argparse Namespace object, containing the parsed arguments.
tuf.exceptions.Error, if 'parsed_arguments' is not a Namespace object.
Connects to a repository mirror and updates the local metadata files and
any target files. Obsolete, local targets are also removed.
None.
"""
if not isinstance(parsed_arguments, argparse.Namespace):
raise tuf.exceptions.Error('Invalid namespace object.')
else:
logger.debug('We have a valid argparse Namespace object.')
# Set the local repositories directory containing all of the metadata files.
tuf.settings.repositories_directory = '.'
# Set the repository mirrors. This dictionary is needed by the Updater
# class of updater.py.
repository_mirrors = {'mirror': {'url_prefix': parsed_arguments.repo,
'metadata_path': 'metadata', 'targets_path': 'targets',
'confined_target_dirs': ['']}}
# Create the repository object using the repository name 'repository'
# and the repository mirrors defined above.
updater = tuf.client.updater.Updater('tufrepo', repository_mirrors)
return (
'Downloaded ' + repr(self.metadata_role) + ' is older (' +
repr(self.previous_version) + ') than the version currently '
'installed (' + repr(self.current_version) + ').')
def __repr__(self):
return self.__class__.__name__ + ' : ' + str(self)
# # Directly instance-reproducing:
# return (
# self.__class__.__name__ + '(' + repr(self.metadata_role) + ', ' +
# repr(self.previous_version) + ', ' + repr(self.current_version) + ')')
class CryptoError(Error):
"""Indicate any cryptography-related errors."""
class BadSignatureError(CryptoError):
"""Indicate that some metadata file has a bad signature."""
def __init__(self, metadata_role_name):
super(BadSignatureError, self).__init__()
self.metadata_role_name = metadata_role_name
def __str__(self):
return repr(self.metadata_role_name) + ' metadata has a bad signature.'
def __repr__(self):
return self.__class__.__name__ + ' : ' + str(self)