Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
targets_public = import_ed25519_publickey_from_file(targets_key_file + '.pub')
delegation_public = import_ed25519_publickey_from_file(delegation_key_file + '.pub')
# Import the private keys. These private keys are needed to generate the
# signatures included in metadata.
project_private = import_rsa_privatekey_from_file(project_key_file, 'password')
targets_private = import_ed25519_privatekey_from_file(targets_key_file, 'password')
delegation_private = import_ed25519_privatekey_from_file(delegation_key_file, 'password')
os.mkdir("project")
os.mkdir("project/targets")
# Create the target files (downloaded by clients) whose file size and digest
# are specified in the 'targets.json' file.
target1_filepath = 'project/targets/file1.txt'
securesystemslib.util.ensure_parent_dir(target1_filepath)
target2_filepath = 'project/targets/file2.txt'
securesystemslib.util.ensure_parent_dir(target2_filepath)
target3_filepath = 'project/targets/file3.txt'
securesystemslib.util.ensure_parent_dir(target2_filepath)
if not options.dry_run:
with open(target1_filepath, 'wt') as file_object:
file_object.write('This is an example target file.')
with open(target2_filepath, 'wt') as file_object:
file_object.write('This is an another example target file.')
with open(target3_filepath, 'wt') as file_object:
file_object.write('This is role1\'s target file.')
# Import the private keys. These private keys are needed to generate the
# signatures included in metadata.
project_private = import_rsa_privatekey_from_file(project_key_file, 'password')
targets_private = import_ed25519_privatekey_from_file(targets_key_file, 'password')
delegation_private = import_ed25519_privatekey_from_file(delegation_key_file, 'password')
os.mkdir("project")
os.mkdir("project/targets")
# Create the target files (downloaded by clients) whose file size and digest
# are specified in the 'targets.json' file.
target1_filepath = 'project/targets/file1.txt'
securesystemslib.util.ensure_parent_dir(target1_filepath)
target2_filepath = 'project/targets/file2.txt'
securesystemslib.util.ensure_parent_dir(target2_filepath)
target3_filepath = 'project/targets/file3.txt'
securesystemslib.util.ensure_parent_dir(target2_filepath)
if not options.dry_run:
with open(target1_filepath, 'wt') as file_object:
file_object.write('This is an example target file.')
with open(target2_filepath, 'wt') as file_object:
file_object.write('This is an another example target file.')
with open(target3_filepath, 'wt') as file_object:
file_object.write('This is role1\'s target file.')
project = create_new_project("test-flat", 'project/test-flat', 'prefix', 'project/targets')
repository.root.add_verification_key(root_public)
repository.targets.add_verification_key(targets_public)
repository.snapshot.add_verification_key(snapshot_public)
repository.timestamp.add_verification_key(timestamp_public)
# Load the signing keys, previously imported, for the top-level roles so that
# valid metadata can be written.
repository.root.load_signing_key(root_private)
repository.targets.load_signing_key(targets_private)
repository.snapshot.load_signing_key(snapshot_private)
repository.timestamp.load_signing_key(timestamp_private)
# Create the target files (downloaded by clients) whose file size and digest
# are specified in the 'targets.json' file.
target1_filepath = 'repository/targets/file1.txt'
securesystemslib.util.ensure_parent_dir(target1_filepath)
target2_filepath = 'repository/targets/file2.txt'
securesystemslib.util.ensure_parent_dir(target2_filepath)
target3_filepath = 'repository/targets/file3.txt'
securesystemslib.util.ensure_parent_dir(target2_filepath)
if not options.dry_run:
with open(target1_filepath, 'wt') as file_object:
file_object.write('This is an example target file.')
with open(target2_filepath, 'wt') as file_object:
file_object.write('This is an another example target file.')
with open(target3_filepath, 'wt') as file_object:
file_object.write('This is role1\'s target file.')
# Add target files to the top-level 'targets.json' role. These target files
# Load the signing keys, previously imported, for the top-level roles so that
# valid metadata can be written.
repository.root.load_signing_key(root_private)
repository.targets.load_signing_key(targets_private)
repository.snapshot.load_signing_key(snapshot_private)
repository.timestamp.load_signing_key(timestamp_private)
# Create the target files (downloaded by clients) whose file size and digest
# are specified in the 'targets.json' file.
target1_filepath = 'repository/targets/file1.txt'
securesystemslib.util.ensure_parent_dir(target1_filepath)
target2_filepath = 'repository/targets/file2.txt'
securesystemslib.util.ensure_parent_dir(target2_filepath)
target3_filepath = 'repository/targets/file3.txt'
securesystemslib.util.ensure_parent_dir(target2_filepath)
if not options.dry_run:
with open(target1_filepath, 'wt') as file_object:
file_object.write('This is an example target file.')
with open(target2_filepath, 'wt') as file_object:
file_object.write('This is an another example target file.')
with open(target3_filepath, 'wt') as file_object:
file_object.write('This is role1\'s target file.')
# Add target files to the top-level 'targets.json' role. These target files
# should already exist. 'target1_filepath' contains additional information
# about the target (i.e., file permissions in octal format.)
octal_file_permissions = oct(os.stat(target1_filepath).st_mode)[4:]
file_permissions = {'file_permissions': octal_file_permissions}
root_signable = securesystemslib.util.load_json_file(root_filepath)
# _generate_and_write_metadata() expects the top-level roles
# (specifically 'snapshot') and keys to be available in 'tuf.roledb'.
tuf.roledb.create_roledb_from_root_metadata(root_signable['signed'],
repository_name)
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
targets_directory = os.path.join(temporary_directory, 'targets')
os.mkdir(targets_directory)
repository_directory = os.path.join(temporary_directory, 'repository')
metadata_directory = os.path.join(repository_directory,
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
targets_metadata = os.path.join('repository_data', 'repository', 'metadata',
'targets.json')
obsolete_metadata = os.path.join(metadata_directory, 'obsolete_role.json')
securesystemslib.util.ensure_parent_dir(obsolete_metadata)
shutil.copyfile(targets_metadata, obsolete_metadata)
# Verify that obsolete metadata (a metadata file exists on disk, but the
# role is unavailable in 'tuf.roledb'). First add the obsolete
# role to 'tuf.roledb' so that its metadata file can be written to disk.
targets_roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
targets_roleinfo['version'] = 1
expiration = \
tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400))
expiration = expiration.isoformat() + 'Z'
targets_roleinfo['expires'] = expiration
tuf.roledb.add_role('obsolete_role', targets_roleinfo,
repository_name=repository_name)
repo_lib._generate_and_write_metadata('obsolete_role', obsolete_metadata,
targets_directory, metadata_directory, consistent_snapshot=False,
def test_B2_ensure_parent_dir(self):
existing_parent_dir = self.make_temp_directory()
non_existing_parent_dir = os.path.join(existing_parent_dir, 'a', 'b')
for parent_dir in [existing_parent_dir, non_existing_parent_dir, 12, [3]]:
if isinstance(parent_dir, six.string_types):
securesystemslib.util.ensure_parent_dir(os.path.join(parent_dir, 'a.txt'))
self.assertTrue(os.path.isdir(parent_dir))
else:
self.assertRaises(securesystemslib.exceptions.FormatError,
securesystemslib.util.ensure_parent_dir, parent_dir)
def test_generate_targets_metadata(self):
# Test normal case.
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
targets_directory = os.path.join(temporary_directory, 'targets')
file1_path = os.path.join(targets_directory, 'file.txt')
securesystemslib.util.ensure_parent_dir(file1_path)
with open(file1_path, 'wt') as file_object:
file_object.write('test file.')
# Set valid generate_targets_metadata() arguments. Add a custom field for
# the 'target_files' target set below.
version = 1
datetime_object = datetime.datetime(2030, 1, 1, 12, 0)
expiration_date = datetime_object.isoformat() + 'Z'
file_permissions = oct(os.stat(file1_path).st_mode)[4:]
target_files = {'file.txt': {'file_permission': file_permissions}}
delegations = {"keys": {
"a394c28384648328b16731f81440d72243c77bb44c07c040be99347f0df7d7bf": {
"keytype": "ed25519",
"keyval": {
"""
# Get the 'current' and 'previous' full file paths for 'metadata_role'
metadata_filepath = metadata_role + '.json'
previous_filepath = os.path.join(self.metadata_directory['previous'],
metadata_filepath)
current_filepath = os.path.join(self.metadata_directory['current'],
metadata_filepath)
# Remove the previous path if it exists.
if os.path.exists(previous_filepath):
os.remove(previous_filepath)
# Move the current path to the previous path.
if os.path.exists(current_filepath):
securesystemslib.util.ensure_parent_dir(previous_filepath)
os.rename(current_filepath, previous_filepath)
# RSA key..
else:
keypath = securesystemslib.interface.generate_and_write_rsa_keypair(
parsed_arguments.filename, password=parsed_arguments.pw)
# If a filename is not given, the generated keypair is saved to the current
# working directory. By default, the keypair is written to .pub
# and (private key).
if not parsed_arguments.filename:
privkey_repo_path = os.path.join(parsed_arguments.path,
KEYSTORE_DIR, os.path.basename(keypath))
pubkey_repo_path = os.path.join(parsed_arguments.path,
KEYSTORE_DIR, os.path.basename(keypath + '.pub'))
securesystemslib.util.ensure_parent_dir(privkey_repo_path)
securesystemslib.util.ensure_parent_dir(pubkey_repo_path)
# Move them from the CWD to the repo's keystore.
shutil.move(keypath, privkey_repo_path)
shutil.move(keypath + '.pub', pubkey_repo_path)
# The metadata has been verified. Move the metadata file into place.
# First, move the 'current' metadata file to the 'previous' directory
# if it exists.
current_filepath = os.path.join(self.metadata_directory['current'],
metadata_filename)
current_filepath = os.path.abspath(current_filepath)
securesystemslib.util.ensure_parent_dir(current_filepath)
previous_filepath = os.path.join(self.metadata_directory['previous'],
metadata_filename)
previous_filepath = os.path.abspath(previous_filepath)
if os.path.exists(current_filepath):
# Previous metadata might not exist, say when delegations are added.
securesystemslib.util.ensure_parent_dir(previous_filepath)
shutil.move(current_filepath, previous_filepath)
# Next, move the verified updated metadata file to the 'current' directory.
metadata_file_object.seek(0)
metadata_signable = \
securesystemslib.util.load_json_string(metadata_file_object.read().decode('utf-8'))
securesystemslib.util.persist_temp_file(metadata_file_object, current_filepath)
# Extract the metadata object so we can store it to the metadata store.
# 'current_metadata_object' set to 'None' if there is not an object
# stored for 'metadata_role'.
updated_metadata_object = metadata_signable['signed']
current_metadata_object = self.metadata['current'].get(metadata_role)
self._verify_root_chain_link(metadata_role, current_metadata_object,