Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# We'll need json module for testing '_encrypt()' and '_decrypt()'
# internal function.
json = tuf.util.import_json()
tuf.repo.keystore._PBKDF2_ITERATIONS = 1000
# Creating a directory string in current directory.
_CURRENT_DIR = os.getcwd()
_DIR = os.path.join(_CURRENT_DIR, 'test_keystore')
# Check if directory '_DIR' exists.
if os.path.exists(_DIR):
msg = ('\''+_DIR+'\' directory already exists,'+
' please change '+'\'_DIR\''+' to something else.')
raise tuf.Error(msg)
KEYSTORE = tuf.repo.keystore
RSAKEYS = []
PASSWDS = []
temp_keys_info = []
temp_keys_vals = []
for i in range(3):
# Populating the original 'RSAKEYS' and 'PASSWDS' lists.
RSAKEYS.append(tuf.keys.generate_rsa_key())
PASSWDS.append('passwd_'+str(i))
# Saving original copies of 'RSAKEYS' and 'PASSWDS' to temp variables
# in order to repopulate them at the start of every test.
temp_keys_info.append(RSAKEYS[i].values())
temp_keys_vals.append(RSAKEYS[i]['keyval'].values())
The parent targets metadata file is modified. The 'delegations' field of
is added or updated.
None.
"""
# Verify the 'keystore_directory' argument.
keystore_directory = _check_directory(keystore_directory)
# Get the metadata directory.
try:
metadata_directory = _get_metadata_directory()
except (tuf.FormatError, tuf.Error), e:
message = str(e)+'\n'
raise tuf.RepositoryError(message)
# Get the delegated role's target paths, which should be located within
# the repository's targets directory. We need these directory/file paths to
# generate the delegated role's metadata file.
prompt = '\nThe paths entered below should be located within the '+\
'repository\'s targets directory.\nEnter the directory, directories, or '+\
'any number of file paths containing the delegated role\'s target files: '
delegated_targets_input = _prompt(prompt, str)
delegated_targets_input = delegated_targets_input.split()
# Verify the format of the delegated targets specified by the user.
# The paths in 'delegated_targets_input' will be verified in
# in the _make_delegated_metadata() call.
try:
# Inspect each line of the 'info' file, searching for the line that
# specifies the targets metadata file. Raise an exception if all
# the lines are processed without finding the 'metadata=' line.
for line in file_object:
# Search 'info' for a 'metadata=.../targets.txt' line.
parts = line.strip().split('=')
if parts[0] == 'metadata':
metadata_basename = os.path.basename(parts[1])
if metadata_basename != 'targets.txt':
message = 'No support yet for pushing delegated targets metadata.'
raise tuf.Error(message)
else:
new_targets_metadata_file = parts[1]
break
else:
raise tuf.Error('No "metadata=" line in push info file.')
finally:
file_object.close()
# Read the new targets metadata that was pushed.
new_targets_metadatapath = os.path.join(push_temporary_directory,
new_targets_metadata_file)
new_targets_signable = tuf.util.load_json_file(new_targets_metadatapath)
# Ensure 'new_targets_signable' is properly formatted.
try:
tuf.formats.check_signable_object_format(new_targets_signable)
except tuf.FormatError, e:
raise tuf.Error('The pushed targets metadata file is invalid.')
# Read the existing targets metadata from the repository.
targets_metadatapath = os.path.join(metadata_directory, 'targets.txt')
tuf.formats.TIME_SCHEMA.check_match(expiration_date)
filedict = {}
repository_directory = check_directory(repository_directory)
# Generate the file info for all the target files listed in 'target_files'.
for target in target_files:
# Strip 'targets/' from from 'target' and keep the rest (e.g.,
# 'targets/more_targets/somefile.txt' -> 'more_targets/somefile.txt'
relative_targetpath = os.path.sep.join(target.split(os.path.sep)[1:])
target_path = os.path.join(repository_directory, target)
if not os.path.exists(target_path):
message = repr(target_path)+' could not be read. Unable to generate '+\
'targets metadata.'
raise tuf.Error(message)
filedict[relative_targetpath] = get_metadata_file_info(target_path)
# Generate the targets metadata object.
targets_metadata = tuf.formats.TargetsFile.make_metadata(version,
expiration_date,
filedict)
return tuf.formats.make_signable(targets_metadata)
# Create a new signature list. If 'keyid' is encountered,
# do not add it to new list.
signatures = []
for signature in signable['signatures']:
if not keyid == signature['keyid']:
signatures.append(signature)
signable['signatures'] = signatures
# Generate the signature using the appropriate signing method.
if key['keytype'] == 'rsa':
signed = signable['signed']
signature = tuf.sig.generate_rsa_signature(signed, key)
signable['signatures'].append(signature)
else:
raise tuf.Error('The keystore contains a key with an invalid key type')
# Raise 'tuf.FormatError' if the resulting 'signable' is not formatted
# correctly.
tuf.formats.check_signable_object_format(signable)
return signable
file_object = open(os.path.join(pushpath, 'receive.result'), 'w')
except IOError, e:
raise tuf.Error('Unable to open "receive.result" file: '+str(e))
try:
file_object.write('SUCCESS')
file_object.write('\n')
finally:
file_object.close()
return True
except tuf.Error, e:
# Write the '{pushpath}/receive.result' file that indicates FAILURE.
try:
file_object = open(os.path.join(pushpath, 'receive.result'), 'w')
except IOError, e:
raise tuf.Error('Unable to open "receive.result" file: '+str(e))
try:
file_object.write("FAILURE")
file_object.write('\n')
finally:
file_object.close()
# Log the error message to {pushpath}/receive.log
# The developer may later search this log file for specific
# error messages on failed push attempts.
try:
file_object = open(os.path.join(pushpath, 'receive.log'), 'a')
except IOError, e:
raise tuf.Error('Unable to open receive log file: '+str(e))
try:
file_object.write(str(e))
file_object.write('\n')
for attempt in range(MAX_INPUT_ATTEMPTS):
message = '\nEnter the password for the '+key+' role ('+keyid+'): '
password = _get_password(message)
loaded_key = load_key(keystore_directory, [keyid], [password])
if not loaded_key or keyid not in loaded_key:
message = 'Could not load keyid: '+keyid
logger.error(message)
continue
role_keyids.append(keyid)
break
# Ensure we loaded all the keyids.
for keyid in value['keyids']:
if keyid not in role_keyids:
raise tuf.Error('Could not load a required role key')
if not role_keyids:
raise tuf.Error('Could not load the required keys for '+role)
return role_keyids
# user modifying the files we work with. The temp directory is only
# accessible by the calling process.
temporary_directory = tempfile.mkdtemp()
push_temporary_directory = os.path.join(temporary_directory, 'push')
shutil.copytree(pushpath, push_temporary_directory)
# Read the 'root' metadata of the current repository. 'root.txt'
# is needed to authorize the 'targets' metadata file.
root_metadatapath = os.path.join(metadata_directory, 'root.txt')
root_signable = tuf.util.load_json_file(root_metadatapath)
# Ensure 'root_signable' is properly formatted.
try:
tuf.formats.check_signable_object_format(root_signable)
except tuf.FormatError, e:
raise tuf.Error('The repository contains an invalid "root.txt".')
# Extract the metadata object and load the key and role databases.
# The keys and roles are needed to verify the signatures of the
# metadata files.
root_metadata = root_signable['signed']
tuf.keydb.create_keydb_from_root_metadata(root_metadata)
tuf.roledb.create_roledb_from_root_metadata(root_metadata)
# Determine the name of the targets metadata file that was pushed.
# The required 'info' file should list the metadata file that was
# pushed by the developer. Only 'targets.txt' currently supported
# (i.e., no delegated roles are accepted).
new_targets_metadata_file = None
try:
file_object = open(os.path.join(push_temporary_directory, 'info'), 'r')
except IOError, e:
targets_directory = _check_directory(targets_directory)
# Generate the fileinfo of all the target files listed in 'target_files'.
for target in target_files:
# The root-most folder of the targets directory should not be included.
# (e.g., 'targets/more_targets/somefile.txt' -> 'more_targets/somefile.txt')
relative_targetpath = target
target_path = os.path.join(targets_directory, target)
# Ensure all target files listed in 'target_files' exist. If just one of
# these files does not exist, raise an exception.
if not os.path.exists(target_path):
message = repr(target_path)+' cannot be read. Unable to generate '+ \
'targets metadata.'
raise tuf.Error(message)
filedict[relative_targetpath] = get_metadata_file_info(target_path)
# Generate the targets metadata object.
targets_metadata = tuf.formats.TargetsFile.make_metadata(version,
expiration_date,
filedict,
delegations)
return targets_metadata