Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if __name__ == '__main__':
# Parse the options and set the logging level.
configuration_file = parse_options()
# Return codes for conformance_tester.py.
SUCCESS = 0
FAILURE = 1
# Execute the tests..
try:
run_conformance_testing(configuration_file)
except (tuf.exceptions.Error) as exception:
sys.exit(FAILURE)
# Successfully updated the target file.
sys.exit(SUCCESS)
These functions are also tested in the course of testing other modules.
"""
# Load sample data, either JSON or ASN.1/DER depending on METADATA_FORMAT.
if tuf.conf.METADATA_FORMAT == 'json':
sample_time_attestation = json.load(open(os.path.join(
SAMPLES_DIR, 'sample_timeserver_attestation.json')))
sample_vehicle_manifest = json.load(open(os.path.join(SAMPLES_DIR,
'sample_vehicle_version_manifest_democar.json')))
sample_ecu_manifest = json.load(open(os.path.join(SAMPLES_DIR,
'sample_ecu_manifest_TCUdemocar.json')))
fresh_time_attestation = tuf.formats.make_signable(
sample_time_attestation['signed'])
fresh_vehicle_manifest = tuf.formats.make_signable(
sample_vehicle_manifest['signed'])
fresh_ecu_manifest = tuf.formats.make_signable(
sample_ecu_manifest['signed'])
elif tuf.conf.METADATA_FORMAT == 'der':
sample_time_attestation = \
asn1_codec.convert_signed_der_to_dersigned_json(open(os.path.join(
SAMPLES_DIR, 'sample_timeserver_attestation.der'), 'rb').read(),
DATATYPE_TIME_ATTESTATION)
sample_vehicle_manifest = \
def test_3__targets_of_role(self):
# Setup.
# Extract the list of targets from 'targets.json', to be compared to what
# is returned by _targets_of_role('targets').
targets_in_metadata = \
self.repository_updater.metadata['current']['targets']['targets']
# Test: normal case.
targetinfos_list = self.repository_updater._targets_of_role('targets')
# Verify that the list of targets was returned, and that it contains valid
# target files.
self.assertTrue(tuf.formats.TARGETINFOS_SCHEMA.matches(targetinfos_list))
for targetinfo in targetinfos_list:
self.assertTrue((targetinfo['filepath'], targetinfo['fileinfo']) in six.iteritems(targets_in_metadata))
def test_without_tuf(self):
# Verify that a target file replaced with a malicious version is downloaded
# by a non-TUF client (i.e., a non-TUF client that does not verify hashes,
# detect mix-and-mix attacks, etc.) A tuf client, on the other hand, should
# detect that the downloaded target file is invalid.
# Test: Download a valid target file from the repository.
# Ensure the target file to be downloaded has not already been downloaded,
# and generate its file size and digest. The file size and digest is needed
# to check that the malicious file was indeed downloaded.
target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt')
client_target_path = os.path.join(self.client_directory, 'file1.txt')
self.assertFalse(os.path.exists(client_target_path))
length, hashes = securesystemslib.util.get_file_details(target_path)
fileinfo = tuf.formats.make_fileinfo(length, hashes)
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'targets', 'file1.txt')
# On Windows, the URL portion should not contain back slashes.
six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_target_path)
self.assertTrue(os.path.exists(client_target_path))
length, hashes = securesystemslib.util.get_file_details(client_target_path)
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
self.assertEqual(fileinfo, download_fileinfo)
# Test: Download a target file that has been modified by an attacker.
with open(target_path, 'wt') as file_object:
file_object.write('add malicious content.')
length, hashes = securesystemslib.util.get_file_details(target_path)
targets_roleinfo['threshold'] = old_threshold
tuf.roledb.update_roleinfo('targets', targets_roleinfo,
repository_name=repository_name)
# Verify that status() does not raise
# 'tuf.exceptions.InsufficientKeysError' if a delegated role
# does not contain a threshold of keys.
role1_roleinfo = tuf.roledb.get_roleinfo('role1', repository_name)
old_role1_threshold = role1_roleinfo['threshold']
role1_roleinfo['threshold'] = 10
tuf.roledb.update_roleinfo('role1', role1_roleinfo,
repository_name=repository_name)
repository.status()
# Restore role1's threshold.
role1_roleinfo = tuf.roledb.get_roleinfo('role1', repository_name)
role1_roleinfo['threshold'] = old_role1_threshold
tuf.roledb.update_roleinfo('role1', role1_roleinfo,
repository_name=repository_name)
# Verify status() does not raise 'tuf.exceptions.UnsignedMetadataError' if any of the
# the top-level roles. Test that 'root' is improperly signed.
repository.root.unload_signing_key(root_privkey)
repository.root.load_signing_key(targets_privkey)
repository.status()
repository.targets('role1').unload_signing_key(role1_privkey)
repository.targets('role1').load_signing_key(targets_privkey)
repository.status()
# Reset Root and 'role1', and verify Targets.
repository.root.unload_signing_key(targets_privkey)
os.path.join(self.repository_directory, 'metadata'))
# The client performs a refresh of top-level metadata to get the latest
# changes.
self.repository_updater.refresh()
# Verify that the client is able to recognize that a new set of keys have
# been added to the Snapshot role.
# First, has 'snapshot_keyid' been removed?
snapshot_roleinfo = tuf.roledb.get_roleinfo('snapshot', self.repository_name)
self.assertTrue(snapshot_keyid not in snapshot_roleinfo['keyids'])
# Second, is Snapshot's new key correct? The new key should be
# Timestamp's.
self.assertEqual(len(snapshot_roleinfo['keyids']), 1)
timestamp_roleinfo = tuf.roledb.get_roleinfo('timestamp', self.repository_name)
self.assertEqual(snapshot_roleinfo['keyids'], timestamp_roleinfo['keyids'])
project = developer_tool.load_project(local_tmp)
# Check against backup.
self.assertEqual(list(project.target_files.keys()), list(targets_backup.keys()))
new_delegations = tuf.roledb.get_delegated_rolenames(project.project_name)
self.assertEqual(new_delegations, delegations_backup)
self.assertEqual(project.layout_type, layout_type_backup)
self.assertEqual(project.keys, keys_backup)
self.assertEqual(project('delegation').keys, delegation_keys_backup)
self.assertEqual(project.prefix, prefix_backup)
self.assertEqual(project.project_name, name_backup)
roleinfo = tuf.roledb.get_roleinfo(project.project_name)
self.assertEqual(roleinfo['partial_loaded'], True)
# Load_signing_keys.
project('delegation').load_signing_key(delegation_private_key)
project.status()
project.load_signing_key(project_private_key)
# Backup everything.
# + backup targets.
targets_backup = project.target_files
# We'll need json module for testing '_encrypt()' and '_decrypt()'
# internal function.
json = tuf.util.import_json()
tuf.repo.keystore._PBKDF2_ITERATIONS = 1000
# Creating a directory string in current directory.
_CURRENT_DIR = os.getcwd()
_DIR = os.path.join(_CURRENT_DIR, 'test_keystore')
# Check if directory '_DIR' exists.
if os.path.exists(_DIR):
msg = ('\''+_DIR+'\' directory already exists,'+
' please change '+'\'_DIR\''+' to something else.')
raise tuf.Error(msg)
KEYSTORE = tuf.repo.keystore
RSAKEYS = []
PASSWDS = []
temp_keys_info = []
temp_keys_vals = []
for i in range(3):
# Populating the original 'RSAKEYS' and 'PASSWDS' lists.
RSAKEYS.append(tuf.keys.generate_rsa_key())
PASSWDS.append('passwd_'+str(i))
# Saving original copies of 'RSAKEYS' and 'PASSWDS' to temp variables
# in order to repopulate them at the start of every test.
temp_keys_info.append(RSAKEYS[i].values())
temp_keys_vals.append(RSAKEYS[i]['keyval'].values())
except tuf.KeyAlreadyExistsError:
pass
# Add the delegated role's initial roleinfo, to be fully populated
# when its metadata file is next loaded in the os.walk() iteration.
for role in metadata_object['delegations']['roles']:
rolename = role['name']
roleinfo = {'name': role['name'], 'keyids': role['keyids'],
'threshold': role['threshold'],
'compressions': [''], 'signing_keyids': [],
'signatures': [],
'paths': {},
'partial_loaded': False,
'delegations': {'keys': {},
'roles': []}}
tuf.roledb.add_role(rolename, roleinfo)
return repository
def test_verify_single_key(self):
signable = {'signed' : 'test', 'signatures' : []}
signed = securesystemslib.formats.encode_canonical(signable['signed']).encode('utf-8')
signable['signatures'].append(securesystemslib.keys.create_signature(
KEYS[0], signed))
tuf.keydb.add_key(KEYS[0])
threshold = 1
roleinfo = tuf.formats.build_dict_conforming_to_schema(
tuf.formats.ROLE_SCHEMA, keyids=[KEYS[0]['keyid']], threshold=threshold)
tuf.roledb.add_role('Root', roleinfo)
# This will call verify() and return True if 'signable' is valid,
# False otherwise.
self.assertTrue(tuf.sig.verify(signable, 'Root'))
# Done. Let's remove the added key(s) from the key database.
tuf.keydb.remove_key(KEYS[0]['keyid'])
# Remove the roles.
tuf.roledb.remove_role('Root')