Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
These functions are also tested in the course of testing other modules.
"""
# Load sample data, either JSON or ASN.1/DER depending on METADATA_FORMAT.
if tuf.conf.METADATA_FORMAT == 'json':
sample_time_attestation = json.load(open(os.path.join(
SAMPLES_DIR, 'sample_timeserver_attestation.json')))
sample_vehicle_manifest = json.load(open(os.path.join(SAMPLES_DIR,
'sample_vehicle_version_manifest_democar.json')))
sample_ecu_manifest = json.load(open(os.path.join(SAMPLES_DIR,
'sample_ecu_manifest_TCUdemocar.json')))
fresh_time_attestation = tuf.formats.make_signable(
sample_time_attestation['signed'])
fresh_vehicle_manifest = tuf.formats.make_signable(
sample_vehicle_manifest['signed'])
fresh_ecu_manifest = tuf.formats.make_signable(
sample_ecu_manifest['signed'])
elif tuf.conf.METADATA_FORMAT == 'der':
sample_time_attestation = \
asn1_codec.convert_signed_der_to_dersigned_json(open(os.path.join(
SAMPLES_DIR, 'sample_timeserver_attestation.der'), 'rb').read(),
DATATYPE_TIME_ATTESTATION)
sample_vehicle_manifest = \
def test_3__targets_of_role(self):
# Setup.
# Extract the list of targets from 'targets.json', to be compared to what
# is returned by _targets_of_role('targets').
targets_in_metadata = \
self.repository_updater.metadata['current']['targets']['targets']
# Test: normal case.
targetinfos_list = self.repository_updater._targets_of_role('targets')
# Verify that the list of targets was returned, and that it contains valid
# target files.
self.assertTrue(tuf.formats.TARGETINFOS_SCHEMA.matches(targetinfos_list))
for targetinfo in targetinfos_list:
self.assertTrue((targetinfo['filepath'], targetinfo['fileinfo']) in six.iteritems(targets_in_metadata))
def test_without_tuf(self):
# Verify that a target file replaced with a malicious version is downloaded
# by a non-TUF client (i.e., a non-TUF client that does not verify hashes,
# detect mix-and-mix attacks, etc.) A tuf client, on the other hand, should
# detect that the downloaded target file is invalid.
# Test: Download a valid target file from the repository.
# Ensure the target file to be downloaded has not already been downloaded,
# and generate its file size and digest. The file size and digest is needed
# to check that the malicious file was indeed downloaded.
target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt')
client_target_path = os.path.join(self.client_directory, 'file1.txt')
self.assertFalse(os.path.exists(client_target_path))
length, hashes = securesystemslib.util.get_file_details(target_path)
fileinfo = tuf.formats.make_fileinfo(length, hashes)
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'targets', 'file1.txt')
# On Windows, the URL portion should not contain back slashes.
six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_target_path)
self.assertTrue(os.path.exists(client_target_path))
length, hashes = securesystemslib.util.get_file_details(client_target_path)
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
self.assertEqual(fileinfo, download_fileinfo)
# Test: Download a target file that has been modified by an attacker.
with open(target_path, 'wt') as file_object:
file_object.write('add malicious content.')
length, hashes = securesystemslib.util.get_file_details(target_path)
tuf.FormatError, if an error occurred trying to generate the targets
metadata object.
tuf.Error, if any of the target files could not be read.
The target files are read and file information generated about them.
A targets 'signable' object, conformant to 'tuf.formats.SIGNABLE_SCHEMA'.
"""
# Do the arguments have the correct format.
# Raise 'tuf.FormatError' if there is a mismatch.
tuf.formats.PATHS_SCHEMA.check_match(target_files)
tuf.formats.PATH_SCHEMA.check_match(repository_directory)
tuf.formats.METADATAVERSION_SCHEMA.check_match(version)
tuf.formats.TIME_SCHEMA.check_match(expiration_date)
filedict = {}
repository_directory = check_directory(repository_directory)
# Generate the file info for all the target files listed in 'target_files'.
for target in target_files:
# Strip 'targets/' from from 'target' and keep the rest (e.g.,
# 'targets/more_targets/somefile.txt' -> 'more_targets/somefile.txt'
relative_targetpath = os.path.sep.join(target.split(os.path.sep)[1:])
target_path = os.path.join(repository_directory, target)
if not os.path.exists(target_path):
message = repr(target_path)+' could not be read. Unable to generate '+\
The file is opened and information about the file is generated,
such as file size and its hash.
A dictionary conformant to 'tuf.formats.FILEINFO_SCHEMA'. This
dictionary contains the length, hashes, and custom data about the
'filename' metadata file. SHA256 hashes are generated by default.
"""
# Does 'filename' and 'custom' have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named.
# Raise 'securesystemslib.exceptions.FormatError' if there is a mismatch.
securesystemslib.formats.PATH_SCHEMA.check_match(filename)
if custom is not None:
tuf.formats.CUSTOM_SCHEMA.check_match(custom)
if not os.path.isfile(filename):
message = repr(filename) + ' is not a file.'
raise securesystemslib.exceptions.Error(message)
# Note: 'filehashes' is a dictionary of the form
# {'sha256': 1233dfba312, ...}. 'custom' is an optional
# dictionary that a client might define to include additional
# file information, such as the file's author, version/revision
# numbers, etc.
filesize, filehashes = securesystemslib.util.get_file_details(filename,
securesystemslib.settings.HASH_ALGORITHMS)
return tuf.formats.make_fileinfo(filesize, filehashes, custom=custom)
belonging to the delegated role. Return a string containing
the delegated role's full rolename and its keyids.
"""
# Retrieve the delegated rolename from the user (e.g., 'role1').
delegated_role = _prompt('\nEnter the delegated role\'s name: ', str)
delegated_role = unicode(delegated_role, encoding="utf-8")
# Retrieve the delegated role\'s keyids from the user.
message = 'The keyid of the delegated role must be loaded.'
logger.info(message)
delegated_keyids = _get_keyids(keystore_directory)
# Ensure at least one delegated key was loaded.
if not tuf.formats.THRESHOLD_SCHEMA.matches(len(delegated_keyids)):
message = 'The minimum required threshold of keyids was not loaded.\n'
raise tuf.RepositoryError(message)
return delegated_role, delegated_keyids
tuf.exceptions.UnknownRoleError, if 'role' is not recognized.
None.
A dictionary representing the status of the signatures in 'signable'.
Conformant to tuf.formats.SIGNATURESTATUS_SCHEMA.
"""
# Do the arguments have the correct format? This check will ensure that
# arguments have the appropriate number of objects and object types, and that
# all dict keys are properly named. Raise
# 'securesystemslib.exceptions.FormatError' if the check fails.
tuf.formats.SIGNABLE_SCHEMA.check_match(signable)
securesystemslib.formats.NAME_SCHEMA.check_match(repository_name)
if role is not None:
tuf.formats.ROLENAME_SCHEMA.check_match(role)
if threshold is not None:
tuf.formats.THRESHOLD_SCHEMA.check_match(threshold)
if keyids is not None:
securesystemslib.formats.KEYIDS_SCHEMA.check_match(keyids)
# The signature status dictionary returned.
signature_status = {}
# The fields of the signature_status dict, where each field stores keyids. A
# description of each field:
tuf.FormatError, if the 'signature' argument is improperly formatted.
Updates the 'signatures' field of the role in 'tuf.roledb.py'.
None.
"""
# Does 'signature' have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named.
# Raise 'tuf.FormatError' if any are improperly formatted.
tuf.formats.SIGNATURE_SCHEMA.check_match(signature)
roleinfo = tuf.roledb.get_roleinfo(self.rolename)
if signature in roleinfo['signatures']:
roleinfo['signatures'].remove(signature)
tuf.roledb.update_roleinfo(self.rolename, roleinfo)
A signable object conformant to 'tuf.formats.SIGNABLE_SCHEMA'.
"""
# Do the arguments have the correct format?
# This check ensures arguments have the appropriate number of objects and
# object types, and that all dict keys are properly named.
# Raise 'tuf.FormatError' if the check fails.
tuf.formats.ANYROLE_SCHEMA.check_match(metadata_object)
tuf.formats.KEYIDS_SCHEMA.check_match(keyids)
tuf.formats.PATH_SCHEMA.check_match(filename)
# Make sure the metadata is in 'signable' format. That is,
# it contains a 'signatures' field containing the result
# of signing the 'signed' field of 'metadata' with each
# keyid of 'keyids'.
signable = tuf.formats.make_signable(metadata_object)
# Sign the metadata with each keyid in 'keyids'.
for keyid in keyids:
# Load the signing key.
key = tuf.keydb.get_key(keyid)
logger.info('Signing '+repr(filename)+' with '+key['keyid'])
# Create a new signature list. If 'keyid' is encountered,
# do not add it to new list.
signatures = []
for signature in signable['signatures']:
if not keyid == signature['keyid']:
signatures.append(signature)
signable['signatures'] = signatures
tuf.FormatError, if the generated timestamp metadata object could
not be formatted correctly.
None.
A timestamp 'signable' object, conformant to 'tuf.formats.SIGNABLE_SCHEMA'.
"""
# Do the arguments have the correct format?
# Raise 'tuf.FormatError' if there is mismatch.
tuf.formats.PATH_SCHEMA.check_match(snapshot_filename)
tuf.formats.METADATAVERSION_SCHEMA.check_match(version)
tuf.formats.TIME_SCHEMA.check_match(expiration_date)
# Retrieve the file info for the snapshot metadata file.
# This file information contains hashes, file length, custom data, etc.
fileinfo = {}
fileinfo['snapshot.txt'] = get_metadata_file_info(snapshot_filename)
# Save the file info of the compressed versions of 'timestamp.txt'.
for file_extension in compressions:
compressed_filename = snapshot_filename + '.' + file_extension
try:
compressed_fileinfo = get_metadata_file_info(compressed_filename)
except:
logger.warn('Could not get fileinfo about '+str(compressed_filename))
else:
logger.info('Including fileinfo about '+str(compressed_filename))
fileinfo['snapshot.txt.' + file_extension] = compressed_fileinfo