Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
tuf.FormatError, if any of the arguments are improperly formatted.
tuf.Error, if there was an error while building the targets file.
The targets metadata file is written to a file.
The path for the written targets metadata file.
"""
# Do the arguments have the correct format?
# Raise 'tuf.FormatError' if there is a mismatch.
tuf.formats.PATH_SCHEMA.check_match(delegated_targets_directory)
tuf.formats.KEYIDS_SCHEMA.check_match(delegated_keyids)
tuf.formats.PATH_SCHEMA.check_match(metadata_directory)
tuf.formats.PATH_SCHEMA.check_match(delegation_metadata_directory)
tuf.formats.NAME_SCHEMA.check_match(delegation_role_name)
# Check if 'targets_directory' and 'metadata_directory' are valid.
targets_directory = check_directory(delegated_targets_directory)
metadata_directory = check_directory(metadata_directory)
repository_directory, junk = os.path.split(metadata_directory)
repository_directory_length = len(repository_directory)
# Get the list of targets.
targets = []
for root, directories, files in os.walk(targets_directory):
for target_file in files:
# Note: '+1' in the line below is there to remove '/'.
filename = os.path.join(root, target_file)[repository_directory_length+1:]
A dictionary containing the expected filenames of the top-level
metadata files, such as 'root.txt' and 'release.txt'.
"""
# Does 'metadata_directory' have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named.
# Raise 'tuf.FormatError' if there is a mismatch.
tuf.formats.PATH_SCHEMA.check_match(metadata_directory)
if metadata_directory is None:
metadata_directory = '.'
# Does 'metadata_directory' have the correct format?
# Raise 'tuf.FormatError' if there is a mismatch.
tuf.formats.PATH_SCHEMA.check_match(metadata_directory)
# Store the filepaths of the top-level roles, including the
# 'metadata_directory' for each one.
filenames = {}
filenames[ROOT_FILENAME] = \
os.path.join(metadata_directory, ROOT_FILENAME)
filenames[TARGETS_FILENAME] = \
os.path.join(metadata_directory, TARGETS_FILENAME)
filenames[RELEASE_FILENAME] = \
os.path.join(metadata_directory, RELEASE_FILENAME)
filenames[TIMESTAMP_FILENAME] = \
os.path.join(metadata_directory, TIMESTAMP_FILENAME)
tuf.FormatError
if the arguments are not correctly formatted
uptane.Error
if director_repo_name is not a known repository based on the
map/pinning file (pinned.json)
None.
"""
# Check arguments:
tuf.formats.PATH_SCHEMA.check_match(full_client_dir)
tuf.formats.REPOSITORY_NAME_SCHEMA.check_match(director_repo_name)
tuf.formats.ISO8601_DATETIME_SCHEMA.check_match(time)
uptane.formats.VIN_SCHEMA.check_match(vin)
uptane.formats.ECU_SERIAL_SCHEMA.check_match(ecu_serial)
tuf.formats.ANYKEY_SCHEMA.check_match(timeserver_public_key)
tuf.formats.ANYKEY_SCHEMA.check_match(primary_key)
# TODO: Should also check that primary_key is a private key, not a
# public key.
self.vin = vin
self.ecu_serial = ecu_serial
self.full_client_dir = full_client_dir
# TODO: Consider removing time from [time] here and starting with an empty
# list, or setting time to 0 to start by default.
self.all_valid_timeserver_times = [time]
self.all_valid_timeserver_attestations = []
tuf.UnsignedMetadataError, if a targets metadata file cannot be generated
with a valid threshold of signatures.
The targets metadata object is written to a file.
None.
"""
# Do the arguments have the correct format?
# This check ensures arguments have the appropriate number of objects and
# object types, and that all dict keys are properly named.
# Raise 'tuf.FormatError' if the check fails.
tuf.formats.PATH_SCHEMA.check_match(repository_directory)
tuf.formats.PATH_SCHEMA.check_match(targets_directory)
tuf.formats.ROLENAME_SCHEMA.check_match(rolename)
tuf.formats.ROLEDB_SCHEMA.check_match(roleinfo)
tuf.formats.BOOLEAN_SCHEMA.check_match(write_partial)
# The metadata version number. Clients use the version number to determine
# if the downloaded version is newer than the one currently trusted.
version = roleinfo['version']
# The expiration date, in UTC, of the metadata file, conformant to
# 'tuf.formats.TIME_SCHEMA'.
expiration = roleinfo['expires']
# The corresponding keyids of the signing keys that generate the signatures
# of the delegated metadata file.
keyids = roleinfo['signing_keyids']
tuf.FormatError, if a valid 'signable' object could not be generated.
tuf.Error, if an invalid keytype was found in the keystore.
None.
A signable object conformant to 'tuf.formats.SIGNABLE_SCHEMA'.
"""
# Does 'keyids' and 'filename' have the correct format?
# Raise 'tuf.FormatError' if there is a mismatch.
tuf.formats.KEYIDS_SCHEMA.check_match(keyids)
tuf.formats.PATH_SCHEMA.check_match(filename)
# Make sure the metadata is in 'signable' format. That is,
# it contains a 'signatures' field containing the result
# of signing the 'signed' field of 'metadata' with each
# keyid of 'keyids'.
signable = tuf.formats.make_signable(metadata)
# Sign the metadata with each keyid in 'keyids'.
for keyid in keyids:
# Load the signing key.
key = tuf.repo.keystore.get_key(keyid)
logger.info('Signing '+repr(filename)+' with '+key['keyid'])
# Create a new signature list. If 'keyid' is encountered,
# do not add it to new list.
signatures = []
tuf.Error, if 'filepath' is not found under the repository's targets
directory.
Adds 'filepath' to this role's list of targets. This role's
'tuf.roledb.py' is also updated.
None.
"""
# Does 'filepath' have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named.
# Raise 'tuf.FormatError' if there is a mismatch.
tuf.formats.PATH_SCHEMA.check_match(filepath)
filepath = os.path.abspath(filepath)
# Ensure 'filepath' is found under the repository's targets directory.
if not os.path.commonprefix([self._targets_directory, filepath]) == \
self._targets_directory:
message = repr(filepath)+' is not under the Repository\'s targets '+\
'directory: '+repr(self._targets_directory)
raise tuf.Error(message)
# TODO: Ensure 'filepath' is an allowed target path according to the
# parent's delegation.
# Add 'filepath' (i.e., relative to the targets directory) to the role's
# list of targets.
if os.path.isfile(filepath):
Any other runtime (e.g., IO) exception.
The 'filename' (or the compressed filename) file is created or overwritten
if it exists.
The file path of the written metadata.
"""
# Do the arguments have the correct format?
# This check ensures arguments have the appropriate number of objects and
# object types, and that all dict keys are properly named.
# Raise 'tuf.FormatError' if the check fails.
tuf.formats.SIGNABLE_SCHEMA.check_match(metadata)
tuf.formats.PATH_SCHEMA.check_match(filename)
tuf.formats.COMPRESSION_SCHEMA.check_match(compression)
# Verify the directory of 'filename' and convert 'filename' to its absolute
# path.
_check_directory(os.path.dirname(filename))
filename = os.path.abspath(filename)
file_object = None
# We may modify the filename, depending on the compression algorithm, so we
# store it separately.
filename_with_compression = filename
# Take care of compression by opening the appropriate file object and updating
# 'filename_with_compression', if necessary.
if not len(compression):
tuf.FormatError, if the generated root metadata object could not
be generated with the correct format.
tuf.Error, if an error is encountered while generating the root
metadata object.
'config_filepath' is read and its contents stored.
A root 'signable' object conformant to 'tuf.formats.SIGNABLE_SCHEMA'.
"""
# Does 'config_filepath' have the correct format?
# Raise 'tuf.FormatError' if the match fails.
tuf.formats.PATH_SCHEMA.check_match(config_filepath)
tuf.formats.METADATAVERSION_SCHEMA.check_match(version)
# 'tuf.Error' raised if 'config_filepath' cannot be read.
config = read_config_file(config_filepath)
# The role and key dictionaries to be saved in the root metadata object.
roledict = {}
keydict = {}
# Extract the role, threshold, and keyid information from the config.
# The necessary role metadata is generated from this information.
for rolename in ['root', 'targets', 'snapshot', 'timestamp']:
# If a top-level role is missing from the config, raise an exception.
if rolename not in config:
raise tuf.Error('No '+rolename+' section found in config file.')
keyids = []
# store it separately.
filename_with_compression = filename
# Take care of compression.
if compression is None:
logger.info('No compression for '+str(filename))
file_object = open(filename_with_compression, 'w')
elif compression == 'gz':
logger.info('gzip compression for '+str(filename))
filename_with_compression += '.gz'
file_object = gzip.open(filename_with_compression, 'w')
else:
raise tuf.FormatError('Unknown compression algorithm: '+str(compression))
try:
tuf.formats.PATH_SCHEMA.check_match(filename_with_compression)
logger.info('Writing to '+str(filename_with_compression))
# The metadata object is saved to 'file_object'. The keys
# of the objects are sorted and indentation is used.
json.dump(metadata, file_object, indent=1, sort_keys=True)
file_object.write('\n')
except:
# Raise any runtime exception.
raise
else:
# Otherwise, return the written filename.
return filename_with_compression
finally:
# Always close the file.
file_object.close()
tuf.Error, if there was an error while processing the push.
The 'config_filepath' file is read and its contents stored, the files
in the targets directory (specified in the config file) are copied,
and the copied targets transfered to a specified host.
None.
"""
# Do the arguments have the correct format?
# Raise 'tuf.FormatError' if there is a mismatch.
tuf.formats.PATH_SCHEMA.check_match(config_filepath)
# Is the path to the configuration file valid?
if not os.path.isfile(config_filepath):
message = 'The configuration file path is invalid.'
raise tuf.Error(message)
config_filepath = os.path.abspath(config_filepath)
# Retrieve the push configuration settings required by the transfer
# modules. Raise ('tuf.FormatError', 'tuf.Error') if a valid
# configuration file cannot be retrieved.
config_dict = tuf.pushtools.pushtoolslib.read_config_file(config_filepath, 'push')
# Extract the transfer module identified in the configuration file.
transfer_module = config_dict['general']['transfer_module']
# 'scp' is the only transfer module currently supported. Perform