Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Verify that the expected metadata is written.
for role in ['root.json', 'targets.json', 'snapshot.json', 'timestamp.json']:
role_filepath = os.path.join(metadata_directory, role)
role_signable = securesystemslib.util.load_json_file(role_filepath)
# Raise 'securesystemslib.exceptions.FormatError' if 'role_signable' is
# an invalid signable.
tuf.formats.check_signable_object_format(role_signable)
self.assertTrue(os.path.exists(role_filepath))
# Verify the 'role1.json' delegation is also written.
role1_filepath = os.path.join(metadata_directory, 'role1.json')
role1_signable = securesystemslib.util.load_json_file(role1_filepath)
tuf.formats.check_signable_object_format(role1_signable)
# Verify that an exception is *not* raised for multiple
# repository.writeall().
repository.writeall()
# Verify that status() does not raise an exception.
repository.status()
# Verify that status() does not raise
# 'tuf.exceptions.InsufficientKeysError' if a top-level role
# does not contain a threshold of keys.
targets_roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
old_threshold = targets_roleinfo['threshold']
targets_roleinfo['threshold'] = 10
tuf.roledb.update_roleinfo('targets', targets_roleinfo,
repository_name=repository_name)
# expired version.
#
# A non-TUF client (without a way to detect when metadata has expired) is
# expected to download the same version, and thus the same outdated files.
# Verify that the downloaded 'timestamp.json' contains the same file size
# and hash as the one available locally.
timestamp_path = os.path.join(self.repository_directory, 'metadata',
'timestamp.json')
timestamp_metadata = securesystemslib.util.load_json_file(timestamp_path)
expiry_time = time.time() - 10
expires = tuf.formats.unix_timestamp_to_datetime(int(expiry_time))
expires = expires.isoformat() + 'Z'
timestamp_metadata['signed']['expires'] = expires
tuf.formats.check_signable_object_format(timestamp_metadata)
with open(timestamp_path, 'wb') as file_object:
# Explicitly specify the JSON separators for Python 2 + 3 consistency.
timestamp_content = \
json.dumps(timestamp_metadata, indent=1, separators=(',', ': '),
sort_keys=True).encode('utf-8')
file_object.write(timestamp_content)
client_timestamp_path = os.path.join(self.client_directory, 'timestamp.json')
shutil.copy(timestamp_path, client_timestamp_path)
length, hashes = securesystemslib.util.get_file_details(timestamp_path)
fileinfo = tuf.formats.make_fileinfo(length, hashes)
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'metadata', 'timestamp.json')
# Generate the signature using the appropriate signing method.
if key['keytype'] in SUPPORTED_KEY_TYPES:
if len(key['keyval']['private']):
signed = signable['signed']
signature = tuf.keys.create_signature(key, signed)
signable['signatures'].append(signature)
else:
logger.warn('Private key unset. Skipping: '+repr(keyid))
else:
raise tuf.Error('The keydb contains a key with an invalid key type.')
# Raise 'tuf.FormatError' if the resulting 'signable' is not formatted
# correctly.
tuf.formats.check_signable_object_format(signable)
return signable
along with their associated roles.
"""
# Determine the 'root.txt' filename. This metadata file is needed
# to extract the keyids belonging to the top-level roles.
filenames = tuf.repo.signerlib.get_metadata_filenames(metadata_directory)
root_filename = filenames['root']
# Load the root metadata file. The loaded object should conform to
# 'tuf.formats.SIGNABLE_SCHEMA'.
metadata_signable = tuf.util.load_json_file(root_filename)
# Ensure the loaded json object is properly formatted.
try:
tuf.formats.check_signable_object_format(metadata_signable)
except tuf.FormatError, e:
message = 'Invalid metadata format: '+repr(root_filename)+'.'
raise tuf.RepositoryError(message)
# Extract the 'signed' role object from 'metadata_signable'.
root_metadata = metadata_signable['signed']
# Extract the 'roles' dict, where the dict keys are top-level roles and dict
# values a dictionary containing a list of corresponding keyids and a
# threshold.
top_level_keyids = root_metadata['roles']
# Determine the keyids associated with all the targets roles.
try:
targets_keyids = tuf.repo.signerlib.get_target_keyids(metadata_directory)
except tuf.FormatError, e:
signature = securesystemslib.keys.create_signature(key, signed)
signable['signatures'].append(signature)
except Exception:
logger.warning('Unable to create signature for keyid: ' + repr(keyid))
else:
logger.debug('Private key unset. Skipping: ' + repr(keyid))
else:
raise securesystemslib.exceptions.Error('The keydb contains a key with'
' an invalid key type.' + repr(key['keytype']))
# Raise 'securesystemslib.exceptions.FormatError' if the resulting 'signable'
# is not formatted correctly.
tuf.formats.check_signable_object_format(signable)
return signable
# Copy the contents of pushpath to a temp directory. We don't want the
# user modifying the files we work with. The temp directory is only
# accessible by the calling process.
temporary_directory = tempfile.mkdtemp()
push_temporary_directory = os.path.join(temporary_directory, 'push')
shutil.copytree(pushpath, push_temporary_directory)
# Read the 'root' metadata of the current repository. 'root.txt'
# is needed to authorize the 'targets' metadata file.
root_metadatapath = os.path.join(metadata_directory, 'root.txt')
root_signable = tuf.util.load_json_file(root_metadatapath)
# Ensure 'root_signable' is properly formatted.
try:
tuf.formats.check_signable_object_format(root_signable)
except tuf.FormatError, e:
raise tuf.Error('The repository contains an invalid "root.txt".')
# Extract the metadata object and load the key and role databases.
# The keys and roles are needed to verify the signatures of the
# metadata files.
root_metadata = root_signable['signed']
tuf.keydb.create_keydb_from_root_metadata(root_metadata)
tuf.roledb.create_roledb_from_root_metadata(root_metadata)
# Determine the name of the targets metadata file that was pushed.
# The required 'info' file should list the metadata file that was
# pushed by the developer. Only 'targets.txt' currently supported
# (i.e., no delegated roles are accepted).
new_targets_metadata_file = None
try:
Raise 'tuf.RepositoryError' if 'metadata_filename' cannot be read or
validated.
"""
# If 'metadata_filename' does not exist on the repository, this means
# it will be newly created and thus version 1 of the file.
if not os.path.exists(metadata_filename):
return 1
# Open 'metadata_filename', extract the version number, and return it
# incremented by 1. A metadata's version is used to determine newer metadata
# from older. The client should only accept newer metadata.
try:
signable = tuf.repo.signerlib.read_metadata_file(metadata_filename)
tuf.formats.check_signable_object_format(signable)
except (tuf.FormatError, tuf.Error), e:
message = repr(metadata_filename)+' could not be opened or is invalid.'+\
' Backup or replace it and try again.'
raise tuf.RepositoryError(message)
current_version = signable['signed']['version']
return current_version+1
project.threshold = project_configuration['threshold']
project.prefix = project_configuration['prefix']
project.layout_type = project_configuration['layout_type']
# Traverse the public keys and add them to the project.
keydict = project_configuration['public_keys']
for keyid in keydict:
key, junk = securesystemslib.keys.format_metadata_to_key(keydict[keyid])
project.add_verification_key(key)
# Load the project's metadata.
targets_metadata_path = os.path.join(project_directory, metadata_directory,
project_filename)
signable = tuf.encoding.util.deserialize_file(targets_metadata_path)
tuf.formats.check_signable_object_format(signable)
targets_metadata = signable['signed']
# Remove the prefix from the metadata.
targets_metadata = _strip_prefix_from_targets_metadata(targets_metadata,
prefix)
for signature in signable['signatures']:
project.add_signature(signature)
# Update roledb.py containing the loaded project attributes.
roleinfo = tuf.roledb.get_roleinfo(project_name, repository_name)
roleinfo['signatures'].extend(signable['signatures'])
roleinfo['version'] = targets_metadata['version']
roleinfo['paths'] = targets_metadata['targets']
roleinfo['delegations'] = targets_metadata['delegations']
roleinfo['partial_loaded'] = False