Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
repository.writeall()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory2, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory2, 'metadata.staged'),
os.path.join(self.repository_directory2, 'metadata'))
# Do we get the expected match for the two targetinfo that only differ
# by the custom field?
valid_targetinfo = multi_repo_updater.get_valid_targetinfo(
'file1.txt', match_custom_field=False)
# Verify the case where two repositories provide different targetinfo.
# Modify file1.txt so that different length and hashes are reported by the
# two repositories.
repository = repo_tool.load_repository(self.repository_directory2)
target1 = os.path.join(self.repository_directory2, 'targets', 'file1.txt')
with open(target1, 'ab') as file_object:
file_object.write(b'append extra text')
repository.targets.remove_target(os.path.basename(target1))
repository.targets.add_target(target1)
repository.targets.load_signing_key(self.role_keys['targets']['private'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
repository.writeall()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory2, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory2, 'metadata.staged'),
os.path.join(self.repository_directory2, 'metadata'))
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
targets_directory = os.path.join(repository_directory, repo_lib.TARGETS_DIRECTORY_NAME)
snapshot_filename = os.path.join(metadata_directory,
repo_lib.SNAPSHOT_FILENAME)
# Set valid generate_timestamp_metadata() arguments.
version = 1
expiration_date = '1985-10-21T13:20:00Z'
# Load a valid repository so that top-level roles exist in roledb and
# generate_snapshot_metadata() has roles to specify in snapshot metadata.
repository = repo_tool.Repository(repository_directory, metadata_directory,
targets_directory, repository_name)
repository_junk = repo_tool.load_repository(repository_directory,
repository_name)
timestamp_metadata = repo_lib.generate_timestamp_metadata(snapshot_filename,
version, expiration_date, repository_name)
self.assertTrue(tuf.formats.TIMESTAMP_SCHEMA.matches(timestamp_metadata))
# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.generate_timestamp_metadata, 3, version, expiration_date,
repository_name)
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.generate_timestamp_metadata, snapshot_filename, '3',
expiration_date, repository_name)
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.generate_timestamp_metadata, snapshot_filename, version, '3',
# the repository after modifying a target file of 'role1.json'.
# Backup 'role1.json' (the delegated role to be updated, and then inserted
# again for the mix-and-match attack.)
role1_path = os.path.join(self.repository_directory, 'metadata', 'role1.json')
backup_role1 = os.path.join(self.repository_directory, 'role1.json.backup')
shutil.copy(role1_path, backup_role1)
# Backup 'file3.txt', specified by 'role1.json'.
file3_path = os.path.join(self.repository_directory, 'targets', 'file3.txt')
shutil.copy(file3_path, file3_path + '.backup')
# Re-generate the required metadata on the remote repository. The affected
# metadata must be properly updated and signed with 'repository_tool.py',
# otherwise the client will reject them as invalid metadata. The resulting
# metadata should be valid metadata.
repository = repo_tool.load_repository(self.repository_directory)
# Load the signing keys so that newly generated metadata is properly signed.
timestamp_keyfile = os.path.join(self.keystore_directory, 'timestamp_key')
role1_keyfile = os.path.join(self.keystore_directory, 'delegation_key')
snapshot_keyfile = os.path.join(self.keystore_directory, 'snapshot_key')
timestamp_private = \
repo_tool.import_ed25519_privatekey_from_file(timestamp_keyfile, 'password')
role1_private = \
repo_tool.import_ed25519_privatekey_from_file(role1_keyfile, 'password')
snapshot_private = \
repo_tool.import_ed25519_privatekey_from_file(snapshot_keyfile, 'password')
repository.targets('role1').load_signing_key(role1_private)
repository.snapshot.load_signing_key(snapshot_private)
repository.timestamp.load_signing_key(timestamp_private)
def test_root_rotation_unmet_threshold(self):
repository = repo_tool.load_repository(self.repository_directory)
# Add verification keys
repository.root.add_verification_key(self.role_keys['root']['public'])
repository.root.add_verification_key(self.role_keys['role1']['public'])
repository.targets.add_verification_key(self.role_keys['targets']['public'])
repository.snapshot.add_verification_key(self.role_keys['snapshot']['public'])
repository.timestamp.add_verification_key(self.role_keys['timestamp']['public'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
# Add signing keys
repository.root.load_signing_key(self.role_keys['root']['private'])
repository.root.load_signing_key(self.role_keys['role1']['private'])
# outdated version returned to the client. The repository tool removes
# obsolete metadadata, so do *not* save the backup version in the
# repository's metadata directory.
timestamp_path = os.path.join(self.repository_directory, 'metadata',
'timestamp.json')
backup_timestamp = os.path.join(self.repository_directory,
'timestamp.json.backup')
shutil.copy(timestamp_path, backup_timestamp)
# The fileinfo of the previous version is saved to verify that it is indeed
# accepted by the non-TUF client.
length, hashes = securesystemslib.util.get_file_details(backup_timestamp)
previous_fileinfo = tuf.formats.make_fileinfo(length, hashes)
# Modify the timestamp file on the remote repository.
repository = repo_tool.load_repository(self.repository_directory)
key_file = os.path.join(self.keystore_directory, 'timestamp_key')
timestamp_private = repo_tool.import_ed25519_privatekey_from_file(key_file,
'password')
repository.timestamp.load_signing_key(timestamp_private)
# Set an arbitrary expiration so that the repository tool generates a new
# version.
repository.timestamp.expiration = datetime.datetime(2030, 1, 1, 12, 12)
repository.writeall()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Save the fileinfo of the new version generated to verify that it is
# Empty the existing (old) live metadata directory (relatively fast).
print(' Deleting ' + os.path.join(repo_dir, 'metadata.staged'))
if os.path.exists(os.path.join(repo_dir, 'metadata.staged')):
shutil.rmtree(os.path.join(repo_dir, 'metadata.staged'))
# Atomically move the new metadata into place.
print(' Moving backup to ' + os.path.join(repo_dir, 'metadata.staged'))
os.rename(os.path.join(repo_dir, 'metadata.backup'),
os.path.join(repo_dir, 'metadata.staged'))
# Re-load the repository from the restored metadata.stated directory.
# (We're using a temp variable here, so we have to assign the new reference
# to both the temp and the source variable.)
print(' Reloading repository from backup ' + repo_dir)
director_service_instance.vehicle_repositories[vin] = rt.load_repository(
repo_dir)
# Load the new signing keys to write metadata. The root key is unchanged,
# but must be reloaded because load_repository() was called.
valid_root_private_key = demo.import_private_key('directorroot')
director_service_instance.vehicle_repositories[vin].root.load_signing_key(
valid_root_private_key)
# Copy the staged metadata to a temp directory, which we'll move into place
# atomically in a moment.
shutil.copytree(os.path.join(repo_dir, 'metadata.staged'),
os.path.join(repo_dir, 'metadata.livetemp'))
# Empty the existing (old) live metadata directory (relatively fast).
print(' Deleting live hosted dir:' + os.path.join(repo_dir, 'metadata'))
if os.path.exists(os.path.join(repo_dir, 'metadata')):
def add_verification_key(parsed_arguments):
if not parsed_arguments.pubkeys:
raise tuf.exceptions.Error('--pubkeys must be given with --trust.')
repository = repo_tool.load_repository(
os.path.join(parsed_arguments.path, REPO_DIR))
for keypath in parsed_arguments.pubkeys:
imported_pubkey = import_publickey_from_file(keypath)
if parsed_arguments.role not in ('root', 'targets', 'snapshot', 'timestamp'):
raise tuf.exceptions.Error('The given --role is not a top-level role.')
elif parsed_arguments.role == 'root':
repository.root.add_verification_key(imported_pubkey)
elif parsed_arguments.role == 'targets':
repository.targets.add_verification_key(imported_pubkey)
elif parsed_arguments.role == 'snapshot':
repository.snapshot.add_verification_key(imported_pubkey)
'--delegatee must be set to perform the delegation.')
if parsed_arguments.delegatee in ('root', 'snapshot', 'timestamp', 'targets'):
raise tuf.exceptions.Error(
'Cannot delegate to the top-level role: ' + repr(parsed_arguments.delegatee))
if not parsed_arguments.pubkeys:
raise tuf.exceptions.Error(
'--pubkeys must be set to perform the delegation.')
public_keys = []
for public_key in parsed_arguments.pubkeys:
imported_pubkey = import_publickey_from_file(public_key)
public_keys.append(imported_pubkey)
repository = repo_tool.load_repository(
os.path.join(parsed_arguments.path, REPO_DIR))
if parsed_arguments.role == 'targets':
repository.targets.delegate(parsed_arguments.delegatee, public_keys,
parsed_arguments.delegate, parsed_arguments.threshold,
parsed_arguments.terminating, list_of_targets=None,
path_hash_prefixes=None)
targets_private = import_privatekey_from_file(
os.path.join(parsed_arguments.path, KEYSTORE_DIR, TARGETS_KEY_NAME),
parsed_arguments.targets_pw)
repository.targets.load_signing_key(targets_private)
# A delegated (non-top-level-Targets) role.
else:
def revoke(parsed_arguments):
repository = repo_tool.load_repository(
os.path.join(parsed_arguments.path, REPO_DIR))
if parsed_arguments.role == 'targets':
repository.targets.revoke(parsed_arguments.delegatee)
targets_private = import_privatekey_from_file(
os.path.join(parsed_arguments.path, KEYSTORE_DIR, TARGETS_KEY_NAME),
parsed_arguments.targets_pw)
repository.targets.load_signing_key(targets_private)
# A non-top-level role.
else:
repository.targets(parsed_arguments.role).revoke(parsed_arguments.delegatee)
role_privatekey = import_privatekey_from_file(parsed_arguments.sign)