Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'targets', 'file1.txt')
# On Windows, the URL portion should not contain back slashes.
six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_target_path)
self.assertTrue(os.path.exists(client_target_path))
length, hashes = securesystemslib.util.get_file_details(client_target_path)
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
self.assertEqual(fileinfo, download_fileinfo)
# Test: Download a target file that has been modified by an attacker.
with open(target_path, 'wt') as file_object:
file_object.write('add malicious content.')
length, hashes = securesystemslib.util.get_file_details(target_path)
malicious_fileinfo = tuf.formats.make_fileinfo(length, hashes)
# On Windows, the URL portion should not contain back slashes.
six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_target_path)
length, hashes = securesystemslib.util.get_file_details(client_target_path)
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
# Verify 'download_fileinfo' is unequal to the original trusted version.
self.assertNotEqual(download_fileinfo, fileinfo)
# Verify 'download_fileinfo' is equal to the malicious version.
self.assertEqual(download_fileinfo, malicious_fileinfo)
six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_timestamp_path)
length, hashes = securesystemslib.util.get_file_details(client_timestamp_path)
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
# Verify 'download_fileinfo' is equal to the new version.
self.assertEqual(download_fileinfo, new_fileinfo)
# Restore the previous version of 'timestamp.json' on the remote repository
# and verify that the non-TUF client downloads it (expected, but not ideal).
shutil.move(backup_timestamp, timestamp_path)
# On Windows, the URL portion should not contain back slashes.
six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_timestamp_path)
length, hashes = securesystemslib.util.get_file_details(client_timestamp_path)
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
# Verify 'download_fileinfo' is equal to the previous version.
self.assertEqual(download_fileinfo, previous_fileinfo)
self.assertNotEqual(download_fileinfo, new_fileinfo)
def test_13__get_file(self):
# Test for an "unsafe" download, where the file is downloaded up to
# a required length (and no more). The "safe" download approach
# downloads an exact required length.
targets_path = os.path.join(self.repository_directory, 'metadata', 'targets.json')
file_size, file_hashes = securesystemslib.util.get_file_details(targets_path)
file_type = 'meta'
def verify_target_file(targets_path):
# Every target file must have its length and hashes inspected.
self.repository_updater._hard_check_file_length(targets_path, file_size)
self.repository_updater._check_hashes(targets_path, file_hashes)
self.repository_updater._get_file('targets.json', verify_target_file,
file_type, file_size, download_safely=True)
self.repository_updater._get_file('targets.json', verify_target_file,
file_type, file_size, download_safely=False)
# Test: Expected input.
self.assertEqual(securesystemslib.util.get_file_details(filepath),
(file_length, file_hash))
# Test: Incorrect input.
bogus_inputs = [self.random_string(), 1234, [self.random_string()],
{'a': 'b'}, None]
for bogus_input in bogus_inputs:
if isinstance(bogus_input, six.string_types):
self.assertRaises(securesystemslib.exceptions.Error,
securesystemslib.util.get_file_details, bogus_input)
else:
self.assertRaises(securesystemslib.exceptions.FormatError,
securesystemslib.util.get_file_details, bogus_input)
def test_with_tuf(self):
# An attacker tries to trick a client into installing an extraneous target
# file (a valid file on the repository, in this case) by listing it in the
# project's metadata file. For the purposes of test_with_tuf(),
# 'role1.json' is treated as the metadata file that indicates all
# the files needed to install/update the 'role1' project. The attacker
# simply adds the extraneous target file to 'role1.json', which the TUF
# client should reject as improperly signed.
role1_filepath = os.path.join(self.repository_directory, 'metadata',
'role1.json')
file1_filepath = os.path.join(self.repository_directory, 'targets',
'file1.txt')
length, hashes = securesystemslib.util.get_file_details(file1_filepath)
role1_metadata = securesystemslib.util.load_json_file(role1_filepath)
role1_metadata['signed']['targets']['/file2.txt'] = {}
role1_metadata['signed']['targets']['/file2.txt']['hashes'] = hashes
role1_metadata['signed']['targets']['/file2.txt']['length'] = length
tuf.formats.check_signable_object_format(role1_metadata)
with open(role1_filepath, 'wt') as file_object:
json.dump(role1_metadata, file_object, indent=1, sort_keys=True)
# Un-install the metadata of the top-level roles so that the client can
# download and detect the invalid 'role1.json'.
os.remove(os.path.join(self.client_directory, self.repository_name,
'metadata', 'current', 'snapshot.json'))
os.remove(os.path.join(self.client_directory, self.repository_name,
self.assertTrue(os.path.exists(client_target_path))
length, hashes = securesystemslib.util.get_file_details(client_target_path)
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
self.assertEqual(fileinfo, download_fileinfo)
# Test: Download a target file that has been modified by an attacker.
with open(target_path, 'wt') as file_object:
file_object.write('add malicious content.')
length, hashes = securesystemslib.util.get_file_details(target_path)
malicious_fileinfo = tuf.formats.make_fileinfo(length, hashes)
# On Windows, the URL portion should not contain back slashes.
six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_target_path)
length, hashes = securesystemslib.util.get_file_details(client_target_path)
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
# Verify 'download_fileinfo' is unequal to the original trusted version.
self.assertNotEqual(download_fileinfo, fileinfo)
# Verify 'download_fileinfo' is equal to the malicious version.
self.assertEqual(download_fileinfo, malicious_fileinfo)
def test_2__fileinfo_has_changed(self):
# Verify that the method returns 'False' if file info was not changed.
root_filepath = os.path.join(self.client_metadata_current, 'root.json')
length, hashes = securesystemslib.util.get_file_details(root_filepath)
root_fileinfo = tuf.formats.make_fileinfo(length, hashes)
self.assertFalse(self.repository_updater._fileinfo_has_changed('root.json',
root_fileinfo))
# Verify that the method returns 'True' if length or hashes were changed.
new_length = 8
new_root_fileinfo = tuf.formats.make_fileinfo(new_length, hashes)
self.assertTrue(self.repository_updater._fileinfo_has_changed('root.json',
new_root_fileinfo))
# Hashes were changed.
new_hashes = {'sha256': self.random_string()}
new_root_fileinfo = tuf.formats.make_fileinfo(length, new_hashes)
self.assertTrue(self.repository_updater._fileinfo_has_changed('root.json',
new_root_fileinfo))
# Verify that _fileinfo_has_changed() returns True if no fileinfo (or set
# Try to add more consistent metadata files.
version_number += 1
root_signable['signed']['version'] = version_number
repo_lib.write_metadata_file(root_signable, output_filename,
version_number, consistent_snapshot=True)
# Test if the latest root.json points to the expected consistent file
# and consistent metadata do not all point to the same root.json
version_and_filename = str(version_number) + '.' + 'root.json'
second_version_output_file = os.path.join(temporary_directory, version_and_filename)
self.assertTrue(os.path.exists(second_version_output_file))
# Verify that the second version is equal to the second output file, and
# that the second output filename differs from the first.
self.assertEqual(securesystemslib.util.get_file_details(output_filename),
securesystemslib.util.get_file_details(second_version_output_file))
self.assertNotEqual(securesystemslib.util.get_file_details(output_filename),
securesystemslib.util.get_file_details(first_version_output_file))
# Test for an improper settings.CONSISTENT_METHOD string value.
tuf.settings.CONSISTENT_METHOD = 'somebadidea'
# Test for invalid consistent methods on systems other than Windows,
# which always uses the copy method.
if platform.system() == 'Windows':
pass
else:
self.assertRaises(securesystemslib.exceptions.InvalidConfigurationError,
repo_lib.write_metadata_file, root_signable, output_filename,
version_number, consistent_snapshot=True)
version_number += 1
root_signable['signed']['version'] = version_number
repo_lib.write_metadata_file(root_signable, output_filename,
version_number, consistent_snapshot=True)
# Test if the latest root.json points to the expected consistent file
# and consistent metadata do not all point to the same root.json
version_and_filename = str(version_number) + '.' + 'root.json'
second_version_output_file = os.path.join(temporary_directory, version_and_filename)
self.assertTrue(os.path.exists(second_version_output_file))
# Verify that the second version is equal to the second output file, and
# that the second output filename differs from the first.
self.assertEqual(securesystemslib.util.get_file_details(output_filename),
securesystemslib.util.get_file_details(second_version_output_file))
self.assertNotEqual(securesystemslib.util.get_file_details(output_filename),
securesystemslib.util.get_file_details(first_version_output_file))
# Test for an improper settings.CONSISTENT_METHOD string value.
tuf.settings.CONSISTENT_METHOD = 'somebadidea'
# Test for invalid consistent methods on systems other than Windows,
# which always uses the copy method.
if platform.system() == 'Windows':
pass
else:
self.assertRaises(securesystemslib.exceptions.InvalidConfigurationError,
repo_lib.write_metadata_file, root_signable, output_filename,
version_number, consistent_snapshot=True)
# Try to create a link to root.json when root.json doesn't exist locally.