Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if __name__ == '__main__':
# Parse the options and set the logging level.
configuration_file = parse_options()
# Return codes for conformance_tester.py.
SUCCESS = 0
FAILURE = 1
# Execute the tests..
try:
run_conformance_testing(configuration_file)
except (tuf.exceptions.Error) as exception:
sys.exit(FAILURE)
# Successfully updated the target file.
sys.exit(SUCCESS)
repository.writeall()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Root's version number = 3...
self.repository_updater.refresh()
repository.root.remove_verification_key(self.role_keys['targets']['public'])
repository.root.unload_signing_key(self.role_keys['targets']['private'])
# The following should fail because root rotation requires the new Root
# to be signed with the previous self.role_keys['targets'] key.
self.assertRaises(tuf.exceptions.UnsignedMetadataError,
repository.writeall)
repository.root.load_signing_key(self.role_keys['targets']['private'])
repository.writeall()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Root's version number = 4...
self.repository_updater.refresh()
self.assertEqual(self.repository_updater.metadata['current']['root']['version'], 4)
# Verify that the client is able to recognize that a new set of keys have
# been added to the Root role.
# Test:
# Attempt a file download of a valid target, however, a download exception
# occurs because the target is not within the mirror's confined target
# directories. Adjust mirrors dictionary, so that 'confined_target_dirs'
# field contains at least one confined target and excludes needed target
# file.
mirrors = self.repository_updater.mirrors
for mirror_name, mirror_info in six.iteritems(mirrors):
mirrors[mirror_name]['confined_target_dirs'] = [self.random_path()]
try:
self.repository_updater.download_target(targetinfo,
destination_directory)
except tuf.exceptions.NoWorkingMirrorError as exception:
# Ensure that no mirrors were found due to mismatch in confined target
# directories. get_list_of_mirrors() returns an empty list in this case,
# which does not generate specific exception errors.
self.assertEqual(len(exception.mirror_errors), 0)
else:
self.fail(
'Expected a NoWorkingMirrorError with zero mirror errors in it.')
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Wait just long enough for the Snapshot metadata (which is now on the
# repository) to expire.
time.sleep(max(0, snapshot_expiry_time - time.time() + 1))
try:
# We expect the following refresh() to raise a NoWorkingMirrorError.
self.repository_updater.refresh()
except tuf.exceptions.NoWorkingMirrorError as e:
# NoWorkingMirrorError indicates that we did not find valid, unexpired
# metadata at any mirror. That exception class preserves the errors from
# each mirror. We now assert that for each mirror, the particular error
# detected was that metadata was expired (the Snapshot we manually
# expired).
for mirror_url, mirror_error in six.iteritems(e.mirror_errors):
self.assertTrue(isinstance(mirror_error, tuf.exceptions.ExpiredMetadataError))
self.assertTrue(mirror_url.endswith('snapshot.json'))
else:
self.fail('TUF failed to detect expired, stale Snapshot metadata.'
' Freeze attack successful.')
# The client should have rejected the malicious Snapshot metadata, and
# distrusted the local snapshot file that is no longer valid.
self.assertTrue('snapshot' not in self.repository_updater.metadata['current'])
# by sending just several characters every few seconds.
server_process = self._start_slow_server('mode_2')
client_filepath = os.path.join(self.client_directory, 'file1.txt')
original_average_download_speed = tuf.settings.MIN_AVERAGE_DOWNLOAD_SPEED
tuf.settings.MIN_AVERAGE_DOWNLOAD_SPEED = 3
try:
file1_target = self.repository_updater.get_one_valid_targetinfo('file1.txt')
self.repository_updater.download_target(file1_target, self.client_directory)
# Verify that the specific 'tuf.exceptions.SlowRetrievalError' exception is
# raised by each mirror. 'file1.txt' should be large enough to trigger a
# slow retrieval attack, otherwise the expected exception may not be
# consistently raised.
except tuf.exceptions.NoWorkingMirrorError as exception:
for mirror_url, mirror_error in six.iteritems(exception.mirror_errors):
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'targets', 'file1.txt')
# Verify that 'file1.txt' is the culprit.
self.assertEqual(url_file.replace('\\', '/'), mirror_url)
self.assertTrue(isinstance(mirror_error, tuf.exceptions.SlowRetrievalError))
else:
# Another possibility is to check for a successfully downloaded
# 'file1.txt' at this point.
self.fail('TUF did not prevent a slow retrieval attack.')
finally:
self._stop_slow_server(server_process)
tuf.settings.MIN_AVERAGE_DOWNLOAD_SPEED = original_average_download_speed
# Verify that the TUF client detects replayed metadata and refuses to
# continue the update process.
try:
self.repository_updater.refresh()
# Verify that the specific 'tuf.exceptions.ReplayedMetadataError' is raised by each
# mirror.
except tuf.exceptions.NoWorkingMirrorError as exception:
for mirror_url, mirror_error in six.iteritems(exception.mirror_errors):
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'metadata', 'timestamp.json')
# Verify that 'timestamp.json' is the culprit.
self.assertEqual(url_file.replace('\\', '/'), mirror_url)
self.assertTrue(isinstance(mirror_error, tuf.exceptions.ReplayedMetadataError))
else:
self.fail('TUF did not prevent a replay attack.')
return parsed_arguments
if __name__ == '__main__':
# Parse the command-line arguments and set the logging level.
arguments = parse_arguments()
# Perform an update of all the files in the 'targets' directory located in
# the current directory.
try:
update_client(arguments)
except (tuf.exceptions.NoWorkingMirrorError, tuf.exceptions.RepositoryError,
tuf.exceptions.FormatError, tuf.exceptions.Error) as e:
sys.stderr.write('Error: ' + str(e) + '\n')
sys.exit(1)
# Successfully updated the client's target files.
sys.exit(0)
def _matching_targetinfo(
self, target_filename, mapping, match_custom_field=True):
valid_targetinfo = {}
# Retrieve the targetinfo from each repository using the underlying
# Updater() instance.
for repository_name in mapping['repositories']:
logger.debug('Retrieving targetinfo for ' + repr(target_filename) +
' from repository...')
try:
targetinfo, updater = self._update_from_repository(
repository_name, target_filename)
except (tuf.exceptions.UnknownTargetError, tuf.exceptions.Error):
continue
valid_targetinfo[updater] = targetinfo
matching_targetinfo = {}
logger.debug('Verifying that a threshold of targetinfo are equal...')
# Iterate 'valid_targetinfo', looking for a threshold number of matches
# for 'targetinfo'. The first targetinfo to reach the required threshold
# is returned. For example, suppose the following list of targetinfo and
# a threshold of 2:
# [A, B, C, B, A, C]
# In this case, targetinfo B is returned.
for valid_updater, compared_targetinfo in six.iteritems(valid_targetinfo):
if not self._targetinfo_match(
# '1985-10-21T01:22:00Z'.) Convert it to a unix timestamp and compare it
# against the current time.time() (also in Unix/POSIX time format, although
# with microseconds attached.)
current_time = int(time.time())
# Generate a user-friendly error message if 'expires' is less than the
# current time (i.e., a local time.)
expires_datetime = iso8601.parse_date(expires)
expires_timestamp = tuf.formats.datetime_to_unix_timestamp(expires_datetime)
if expires_timestamp < current_time:
message = 'Metadata '+repr(metadata_rolename)+' expired on ' + \
expires_datetime.ctime() + ' (UTC).'
logger.error(message)
raise tuf.exceptions.ExpiredMetadataError(message)