Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Verify that _load_metadata_from_file() doesn't raise an exception for
# improperly formatted metadata, and doesn't load the bad file.
with open(role1_filepath, 'ab') as file_object:
file_object.write(b'bad JSON data')
self.repository_updater._load_metadata_from_file('current', 'role1')
self.assertEqual(len(self.repository_updater.metadata['current']), 5)
# Test if we fail gracefully if we can't deserialize a meta file
self.repository_updater._load_metadata_from_file('current', 'empty_file')
self.assertFalse('empty_file' in self.repository_updater.metadata['current'])
# Test invalid metadata set argument (must be either
# 'current' or 'previous'.)
self.assertRaises(securesystemslib.exceptions.Error,
self.repository_updater._load_metadata_from_file,
'bad_metadata_set', 'role1')
# Computing the hash and length of the tempfile.
digest_object = securesystemslib.hash.digest_filename(filepath, algorithm='sha256')
file_hash = {'sha256' : digest_object.hexdigest()}
file_length = os.path.getsize(filepath)
# Test: Expected input.
self.assertEqual(securesystemslib.util.get_file_details(filepath),
(file_length, file_hash))
# Test: Incorrect input.
bogus_inputs = [self.random_string(), 1234, [self.random_string()],
{'a': 'b'}, None]
for bogus_input in bogus_inputs:
if isinstance(bogus_input, six.string_types):
self.assertRaises(securesystemslib.exceptions.Error,
securesystemslib.util.get_file_details, bogus_input)
else:
self.assertRaises(securesystemslib.exceptions.FormatError,
securesystemslib.util.get_file_details, bogus_input)
pass
class BadHashError(Error):
"""Indicate an error while checking the value a hash object."""
def __init__(self, expected_hash, observed_hash):
self.expected_hash = expected_hash
self.observed_hash = observed_hash
def __str__(self):
return 'Observed hash (' + repr(self.observed_hash)+\
') != expected hash (' + repr(self.expected_hash)+')'
class BadPasswordError(Error):
"""Indicate an error after encountering an invalid password."""
pass
class CryptoError(Error):
"""Indicate any cryptography-related errors."""
pass
class BadSignatureError(CryptoError):
"""Indicate that some metadata has a bad signature."""
def __init__(self, metadata_role_name):
self.metadata_role_name = metadata_role_name
def __str__(self):
# Generate the signature using the appropriate signing method.
if key['keytype'] in SUPPORTED_KEY_TYPES:
if 'private' in key['keyval']:
signed = signable['signed']
try:
signature = securesystemslib.keys.create_signature(key, signed)
signable['signatures'].append(signature)
except Exception:
logger.warning('Unable to create signature for keyid: ' + repr(keyid))
else:
logger.debug('Private key unset. Skipping: ' + repr(keyid))
else:
raise securesystemslib.exceptions.Error('The keydb contains a key with'
' an invalid key type.' + repr(key['keytype']))
# Raise 'securesystemslib.exceptions.FormatError' if the resulting 'signable'
# is not formatted correctly.
tuf.formats.check_signable_object_format(signable)
return signable
If there was an error importing a delegated role of 'metadata_role'
or the 'metadata_set' is not one currently supported.
If the metadata is loaded successfully, it is saved to the metadata
store. If 'metadata_role' is 'root', the role and key databases
are reloaded. If 'metadata_role' is a target metadata, all its
delegated roles are refreshed.
None.
"""
# Ensure we have a valid metadata set.
if metadata_set not in ['current', 'previous']:
raise securesystemslib.exceptions.Error(
'Invalid metadata set: ' + repr(metadata_set))
# Save and construct the full metadata path.
metadata_directory = self.metadata_directory[metadata_set]
metadata_filename = metadata_role + '.json'
metadata_filepath = os.path.join(metadata_directory, metadata_filename)
# Ensure the metadata path is valid/exists, else ignore the call.
if os.path.exists(metadata_filepath):
# Load the file. The loaded object should conform to
# 'tuf.formats.SIGNABLE_SCHEMA'.
try:
metadata_signable = securesystemslib.util.load_json_file(
metadata_filepath)
# Although the metadata file may exist locally, it may not
metadata['version'] = metadata['version'] + 1
signable = repo_lib.sign_metadata(metadata, roleinfo['signing_keyids'],
metadata_filename, repository_name)
# Write the metadata to file if contains a threshold of signatures.
signable['signatures'].extend(roleinfo['signatures'])
if tuf.sig.verify_signable(signable, rolename, repository_name) or write_partial:
repo_lib._remove_invalid_and_duplicate_signatures(signable, repository_name)
filename = repo_lib.write_metadata_file(signable, metadata_filename,
metadata['version'], False)
# 'signable' contains an invalid threshold of signatures.
else:
message = 'Not enough signatures for ' + repr(metadata_filename)
raise securesystemslib.exceptions.Error(message, signable)
return signable, filename
Deserialized object. For example, a dictionary.
"""
deserialized_object = None
try:
deserialized_object = json.loads(data)
except TypeError:
message = 'Invalid JSON string: ' + repr(data)
raise securesystemslib.exceptions.Error(message)
except ValueError:
message = 'Cannot deserialize to a Python object: ' + repr(data)
raise securesystemslib.exceptions.Error(message)
else:
return deserialized_object
# Is 'datetime_object' a datetime.datetime() object?
# Raise 'securesystemslib.exceptions.FormatError' if not.
if not isinstance(datetime_object, datetime.datetime):
raise securesystemslib.exceptions.FormatError(
repr(datetime_object) + ' is not a datetime.datetime() object.')
# truncate the microseconds value to produce a correct schema string
# of the form yyyy-mm-ddThh:mm:ssZ
datetime_object = datetime_object.replace(microsecond = 0)
# Ensure the expiration has not already passed.
current_datetime_object = \
tuf.formats.unix_timestamp_to_datetime(int(time.time()))
if datetime_object < current_datetime_object:
raise securesystemslib.exceptions.Error(repr(self.rolename) + ' has'
' already expired.')
# Update the role's 'expires' entry in 'tuf.roledb.py'.
roleinfo = tuf.roledb.get_roleinfo(self.rolename, self._repository_name)
expires = datetime_object.isoformat() + 'Z'
roleinfo['expires'] = expires
tuf.roledb.update_roleinfo(self.rolename, roleinfo,
repository_name=self._repository_name)
"""Indicate that a metadata file is not valid JSON."""
def __init__(self, exception):
# Store the original exception.
self.exception = exception
def __str__(self):
# Show the original exception.
return repr(self.exception)
class UnsupportedAlgorithmError(Error):
"""Indicate an error while trying to identify a user-specified algorithm."""
pass
class BadHashError(Error):
"""Indicate an error while checking the value a hash object."""
def __init__(self, expected_hash, observed_hash):
self.expected_hash = expected_hash
self.observed_hash = observed_hash
def __str__(self):
return 'Observed hash (' + repr(self.observed_hash)+\
') != expected hash (' + repr(self.expected_hash)+')'
class BadPasswordError(Error):
"""Indicate an error after encountering an invalid password."""
pass
def import_publickey_from_file(keypath):
try:
key_metadata = securesystemslib.util.load_json_file(keypath)
# An RSA public key is saved to disk in PEM format (not JSON), so the
# load_json_file() call above can fail for this reason. Try to potentially
# load the PEM string in keypath if an exception is raised.
except securesystemslib.exceptions.Error:
key_metadata = securesystemslib.interface.import_rsa_publickey_from_file(
keypath)
key_object, junk = securesystemslib.keys.format_metadata_to_key(key_metadata)
if key_object['keytype'] not in SUPPORTED_KEY_TYPES:
raise tuf.exceptions.Error('Trying to import an unsupported key'
' type: ' + repr(key_object['keytype'] + '.'
' Supported key types: ' + repr(SUPPORTED_KEY_TYPES)))
else:
return key_object