Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
json.dumps(timestamp_metadata, indent=1, separators=(',', ': '),
sort_keys=True).encode('utf-8')
file_object.write(timestamp_content)
client_timestamp_path = os.path.join(self.client_directory, 'timestamp.json')
shutil.copy(timestamp_path, client_timestamp_path)
length, hashes = securesystemslib.util.get_file_details(timestamp_path)
fileinfo = tuf.formats.make_fileinfo(length, hashes)
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'metadata', 'timestamp.json')
six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_timestamp_path)
length, hashes = securesystemslib.util.get_file_details(client_timestamp_path)
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
# Verify 'download_fileinfo' is equal to the current local file.
self.assertEqual(download_fileinfo, fileinfo)
os.path.join(self.repository_directory, 'metadata'))
# Save the fileinfo of the new version generated to verify that it is
# saved by the client.
length, hashes = securesystemslib.util.get_file_details(timestamp_path)
new_fileinfo = tuf.formats.make_fileinfo(length, hashes)
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'metadata', 'timestamp.json')
client_timestamp_path = os.path.join(self.client_directory,
self.repository_name, 'metadata', 'current', 'timestamp.json')
# On Windows, the URL portion should not contain back slashes.
six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_timestamp_path)
length, hashes = securesystemslib.util.get_file_details(client_timestamp_path)
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
# Verify 'download_fileinfo' is equal to the new version.
self.assertEqual(download_fileinfo, new_fileinfo)
# Restore the previous version of 'timestamp.json' on the remote repository
# and verify that the non-TUF client downloads it (expected, but not ideal).
shutil.move(backup_timestamp, timestamp_path)
# On Windows, the URL portion should not contain back slashes.
six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_timestamp_path)
length, hashes = securesystemslib.util.get_file_details(client_timestamp_path)
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
# Verify 'download_fileinfo' is equal to the previous version.
# Test that infinite loop is prevented if the target file is not found and
# the max number of delegations is reached.
valid_max_number_of_delegations = tuf.settings.MAX_NUMBER_OF_DELEGATIONS
tuf.settings.MAX_NUMBER_OF_DELEGATIONS = 0
self.assertEqual(None, self.repository_updater._preorder_depth_first_walk('unknown.txt'))
# Reset the setting for max number of delegations so that subsequent unit
# tests reference the expected setting.
tuf.settings.MAX_NUMBER_OF_DELEGATIONS = valid_max_number_of_delegations
# Attempt to create a circular delegation, where role1 performs a
# delegation to the top-level Targets role. The updater should ignore the
# delegation and not raise an exception.
targets_path = os.path.join(self.client_metadata_current, 'targets.json')
targets_metadata = securesystemslib.util.load_json_file(targets_path)
targets_metadata['signed']['delegations']['roles'][0]['paths'] = ['/file8.txt']
with open(targets_path, 'wb') as file_object:
file_object.write(repo_lib._get_written_metadata(targets_metadata))
role1_path = os.path.join(self.client_metadata_current, 'role1.json')
role1_metadata = securesystemslib.util.load_json_file(role1_path)
role1_metadata['signed']['delegations']['roles'][0]['name'] = 'targets'
role1_metadata['signed']['delegations']['roles'][0]['paths'] = ['/file8.txt']
with open(role1_path, 'wb') as file_object:
file_object.write(repo_lib._get_written_metadata(role1_metadata))
role2_path = os.path.join(self.client_metadata_current, 'role2.json')
role2_metadata = securesystemslib.util.load_json_file(role2_path)
role2_metadata['signed']['delegations']['roles'] = role1_metadata['signed']['delegations']['roles']
role2_metadata['signed']['delegations']['roles'][0]['paths'] = ['/file8.txt']
with open(role2_path, 'wb') as file_object:
def import_publickey_from_file(keypath):
try:
key_metadata = securesystemslib.util.load_json_file(keypath)
# An RSA public key is saved to disk in PEM format (not JSON), so the
# load_json_file() call above can fail for this reason. Try to potentially
# load the PEM string in keypath if an exception is raised.
except securesystemslib.exceptions.Error:
key_metadata = securesystemslib.interface.import_rsa_publickey_from_file(
keypath)
key_object, junk = securesystemslib.keys.format_metadata_to_key(key_metadata)
if key_object['keytype'] not in SUPPORTED_KEY_TYPES:
raise tuf.exceptions.Error('Trying to import an unsupported key'
' type: ' + repr(key_object['keytype'] + '.'
' Supported key types: ' + repr(SUPPORTED_KEY_TYPES)))
else:
# derived key.
junk_old_salt, junk_old_iterations, symmetric_key = \
_generate_derived_key(password, salt, iterations)
# Verify the hmac to ensure the ciphertext is valid and has not been altered.
# See the encryption routine for why we use the encrypt-then-MAC approach.
# The decryption routine may verify a ciphertext without having to perform
# a decryption operation.
generated_hmac_object = \
cryptography.hazmat.primitives.hmac.HMAC(symmetric_key, hashes.SHA256(),
backend=default_backend())
generated_hmac_object.update(ciphertext)
generated_hmac = binascii.hexlify(generated_hmac_object.finalize())
if not securesystemslib.util.digests_are_equal(generated_hmac.decode(), hmac):
raise securesystemslib.exceptions.CryptoError('Decryption failed.')
# Construct a Cipher object, with the key and iv.
decryptor = Cipher(algorithms.AES(symmetric_key), modes.CTR(iv),
backend=default_backend()).decryptor()
# Decryption gets us the authenticated plaintext.
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
return plaintext
securesystemslib.util.ensure_parent_dir(filepath)
# Write the public key (i.e., 'public', which is in PEM format) to
# '.pub'. (1) Create a temporary file, (2) write the contents of
# the public key, and (3) move to final destination.
file_object = tempfile.TemporaryFile()
file_object.write(public.encode('utf-8'))
# The temporary file is closed after the final move.
securesystemslib.util.persist_temp_file(file_object, filepath + '.pub')
# Write the private key in encrypted PEM format to ''.
# Unlike the public key file, the private key does not have a file
# extension.
file_object = tempfile.TemporaryFile()
file_object.write(private.encode('utf-8'))
securesystemslib.util.persist_temp_file(file_object, filepath)
return filepath
target_filepath:
The path to the target file on the repository. This will be relative to
the 'targets' (or equivalent) directory on a given mirror.
None.
None.
The hash of 'target_filepath'.
"""
return securesystemslib.util.get_target_hash(target_filepath)
# At this point the tuf.keydb and tuf.roledb stores must be fully
# populated, otherwise write() throwns a 'tuf.Repository' exception if
# any of the project roles are missing signatures, keys, etc.
# Write the metadata files of all the delegated roles of the project.
delegated_rolenames = tuf.roledb.get_delegated_rolenames(self.project_name,
self.repository_name)
for delegated_rolename in delegated_rolenames:
delegated_filename = os.path.join(self.metadata_directory,
delegated_rolename + METADATA_EXTENSION)
# Ensure the parent directories of 'metadata_filepath' exist, otherwise an
# IO exception is raised if 'metadata_filepath' is written to a
# sub-directory.
securesystemslib.util.ensure_parent_dir(delegated_filename)
_generate_and_write_metadata(delegated_rolename, delegated_filename,
write_partial, self.targets_directory, prefix=self.prefix,
repository_name=self.repository_name)
# Generate the 'project_name' metadata file.
targets_filename = self.project_name + METADATA_EXTENSION
targets_filename = os.path.join(self.metadata_directory, targets_filename)
junk, targets_filename = _generate_and_write_metadata(self.project_name,
targets_filename, write_partial, self.targets_directory,
prefix=self.prefix, repository_name=self.repository_name)
# Save configuration information that is not stored in the project's
# metadata
_save_project_configuration(self.metadata_directory,