Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Verify that the 'self.versioninfo' dictionary is empty (it starts off
# empty and is only populated if _update_versioninfo() is called.
versioninfo_dict = self.repository_updater.versioninfo
self.assertEqual(len(versioninfo_dict), 0)
# Load the versioninfo of the top-level Targets role. This action
# populates the 'self.versioninfo' dictionary.
self.repository_updater._update_versioninfo('targets.json')
self.assertEqual(len(versioninfo_dict), 1)
self.assertTrue(tuf.formats.FILEINFODICT_SCHEMA.matches(versioninfo_dict))
# The Snapshot role stores the version numbers of all the roles available
# on the repository. Load Snapshot to extract Root's version number
# and compare it against the one loaded by 'self.repository_updater'.
snapshot_filepath = os.path.join(self.client_metadata_current, 'snapshot.json')
snapshot_signable = securesystemslib.util.load_json_file(snapshot_filepath)
targets_versioninfo = snapshot_signable['signed']['meta']['targets.json']
# Verify that the manually loaded version number of root.json matches
# the one loaded by the updater object.
self.assertTrue('targets.json' in versioninfo_dict)
self.assertEqual(versioninfo_dict['targets.json'], targets_versioninfo)
# Verify that 'self.versioninfo' is incremented if another role is updated.
self.repository_updater._update_versioninfo('role1.json')
self.assertEqual(len(versioninfo_dict), 2)
# Verify that 'self.versioninfo' is incremented if a non-existent role is
# requested, and has its versioninfo entry set to 'None'.
self.repository_updater._update_versioninfo('bad_role.json')
self.assertEqual(len(versioninfo_dict), 3)
self.assertEqual(versioninfo_dict['bad_role.json'], None)
# 'timestamp.json' specifies the latest version of the repository files. A
# client should only accept the same version of this file up to a certain
# point, or else it cannot detect that new files are available for
# download. Modify the repository's timestamp.json' so that it expires
# soon, copy it over to the client, and attempt to re-fetch the same
# expired version.
#
# A non-TUF client (without a way to detect when metadata has expired) is
# expected to download the same version, and thus the same outdated files.
# Verify that the downloaded 'timestamp.json' contains the same file size
# and hash as the one available locally.
timestamp_path = os.path.join(self.repository_directory, 'metadata',
'timestamp.json')
timestamp_metadata = securesystemslib.util.load_json_file(timestamp_path)
expiry_time = time.time() - 10
expires = tuf.formats.unix_timestamp_to_datetime(int(expiry_time))
expires = expires.isoformat() + 'Z'
timestamp_metadata['signed']['expires'] = expires
tuf.formats.check_signable_object_format(timestamp_metadata)
with open(timestamp_path, 'wb') as file_object:
# Explicitly specify the JSON separators for Python 2 + 3 consistency.
timestamp_content = \
json.dumps(timestamp_metadata, indent=1, separators=(',', ': '),
sort_keys=True).encode('utf-8')
file_object.write(timestamp_content)
client_timestamp_path = os.path.join(self.client_directory, 'timestamp.json')
shutil.copy(timestamp_path, client_timestamp_path)
def test_signatures(self):
# Test default case, where no signatures have been added yet.
self.assertEqual(self.metadata.signatures, [])
# Test getter after adding an example signature.
metadata_directory = os.path.join('repository_data',
'repository', 'metadata')
root_filepath = os.path.join(metadata_directory, 'root.json')
root_signable = securesystemslib.util.load_json_file(root_filepath)
signatures = root_signable['signatures']
# Add the first signature from the list, as only need one is needed.
self.metadata.add_signature(signatures[0])
self.assertEqual(signatures, self.metadata.signatures)
# Verify that the expected metadata is written.
root_filepath = os.path.join(metadata_directory, 'root.json')
root_1_filepath = os.path.join(metadata_directory, '1.root.json')
root_2_filepath = os.path.join(metadata_directory, '2.root.json')
old_root_signable = securesystemslib.util.load_json_file(root_filepath)
root_1_signable = securesystemslib.util.load_json_file(root_1_filepath)
# Make a change to the root keys
repository.root.add_verification_key(targets_pubkey)
repository.root.load_signing_key(targets_privkey)
repository.root.threshold = 2
repository.writeall()
new_root_signable = securesystemslib.util.load_json_file(root_filepath)
root_2_signable = securesystemslib.util.load_json_file(root_2_filepath)
for role_signable in [old_root_signable, new_root_signable, root_1_signable, root_2_signable]:
# Raise 'securesystemslib.exceptions.FormatError' if 'role_signable' is an
# invalid signable.
tuf.formats.check_signable_object_format(role_signable)
# Verify contents of versioned roots
self.assertEqual(old_root_signable, root_1_signable)
self.assertEqual(new_root_signable, root_2_signable)
self.assertEqual(root_1_signable['signed']['version'], 1)
self.assertEqual(root_2_signable['signed']['version'], 2)
repository.root.remove_verification_key(root_pubkey)
repository.root.unload_signing_key(root_privkey)
repository.root.threshold = 2
self.assertFalse('timestamp' in self.repository_updater.metadata)
logger.info('\nroleinfo: ' + repr(tuf.roledb.get_rolenames(self.repository_name)))
self.repository_updater._update_metadata('timestamp',
DEFAULT_TIMESTAMP_FILELENGTH)
self.assertTrue('timestamp' in self.repository_updater.metadata['current'])
os.path.exists(timestamp_filepath)
# Verify 'targets.json' is properly installed.
self.assertFalse('targets' in self.repository_updater.metadata['current'])
self.repository_updater._update_metadata('targets',
DEFAULT_TARGETS_FILELENGTH,
targets_versioninfo['version'])
self.assertTrue('targets' in self.repository_updater.metadata['current'])
targets_signable = securesystemslib.util.load_json_file(targets_filepath)
loaded_targets_version = targets_signable['signed']['version']
self.assertEqual(targets_versioninfo['version'], loaded_targets_version)
# Test: Invalid / untrusted version numbers.
# Invalid version number for 'targets.json'.
self.assertRaises(tuf.exceptions.NoWorkingMirrorError,
self.repository_updater._update_metadata,
'targets', DEFAULT_TARGETS_FILELENGTH, 88)
# Verify that the specific exception raised is correct for the previous
# case.
try:
self.repository_updater._update_metadata('targets',
DEFAULT_TARGETS_FILELENGTH, 88)
except tuf.exceptions.NoWorkingMirrorError as e:
repository_name = 'test_repository'
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
repository_directory = os.path.join(temporary_directory, 'repository')
metadata_directory = os.path.join(repository_directory,
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
targets_directory = os.path.join(repository_directory,
repo_lib.TARGETS_DIRECTORY_NAME)
shutil.copytree(os.path.join('repository_data', 'repository', 'metadata'),
metadata_directory)
shutil.copytree(os.path.join('repository_data', 'repository', 'targets'),
targets_directory)
# Add a duplicate signature to the Root file for testing purposes).
root_file = os.path.join(metadata_directory, 'root.json')
signable = securesystemslib.util.load_json_file(os.path.join(metadata_directory, 'root.json'))
signable['signatures'].append(signable['signatures'][0])
repo_lib.write_metadata_file(signable, root_file, 8, False)
# Attempt to load a repository that contains a compressed Root file.
repository = repo_tool.create_new_repository(repository_directory, repository_name)
filenames = repo_lib.get_metadata_filenames(metadata_directory)
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
filenames = repo_lib.get_metadata_filenames(metadata_directory)
repository = repo_tool.create_new_repository(repository_directory, repository_name)
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
# Partially write all top-level roles (we increase the threshold of each
# top-level role so that they are flagged as partially written.
repository.root.threshold = repository.root.threshold + 1
def test_remove_signature(self):
# Test normal case.
# Add a signature so remove_signature() has some signature to remove.
metadata_directory = os.path.join('repository_data',
'repository', 'metadata')
root_filepath = os.path.join(metadata_directory, 'root.json')
root_signable = securesystemslib.util.load_json_file(root_filepath)
signatures = root_signable['signatures']
self.metadata.add_signature(signatures[0])
self.metadata.remove_signature(signatures[0])
self.assertEqual(self.metadata.signatures, [])
# Test improperly formatted signature argument.
self.assertRaises(securesystemslib.exceptions.FormatError,
self.metadata.remove_signature, 3)
# Test invalid signature argument (i.e., signature that has not been added.)
# Load an unused signature to be tested.
targets_filepath = os.path.join(metadata_directory, 'targets.json')
targets_signable = securesystemslib.util.load_json_file(targets_filepath)
signatures = targets_signable['signatures']
securesystemslib.formats.NAME_SCHEMA.check_match(repository_name)
# Do the same for the prefix
securesystemslib.formats.PATH_SCHEMA.check_match(prefix)
# Clear the role and key databases since we are loading in a new project.
tuf.roledb.clear_roledb(clear_all=True)
tuf.keydb.clear_keydb(clear_all=True)
# Locate metadata filepaths and targets filepath.
project_directory = os.path.abspath(project_directory)
# Load the cfg file and the project.
config_filename = os.path.join(project_directory, PROJECT_FILENAME)
project_configuration = securesystemslib.util.load_json_file(config_filename)
tuf.formats.PROJECT_CFG_SCHEMA.check_match(project_configuration)
targets_directory = os.path.join(project_directory,
project_configuration['targets_location'])
if project_configuration['layout_type'] == 'flat':
project_directory, junk = os.path.split(project_directory)
targets_directory = project_configuration['targets_location']
if new_targets_location is not None:
targets_directory = new_targets_location
metadata_directory = os.path.join(project_directory,
project_configuration['metadata_location'])
new_prefix = None