Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Store and return the cryptography keys of the top-level roles, including 1
# delegated role.
role_keys = {}
root_key_file = os.path.join(keystore_directory, 'root_key')
targets_key_file = os.path.join(keystore_directory, 'targets_key')
snapshot_key_file = os.path.join(keystore_directory, 'snapshot_key')
timestamp_key_file = os.path.join(keystore_directory, 'timestamp_key')
delegation_key_file = os.path.join(keystore_directory, 'delegation_key')
role_keys = {'root': {}, 'targets': {}, 'snapshot': {}, 'timestamp': {},
'role1': {}}
# Import the top-level and delegated role public keys.
role_keys['root']['public'] = \
repo_tool.import_rsa_publickey_from_file(root_key_file+'.pub')
role_keys['targets']['public'] = \
repo_tool.import_ed25519_publickey_from_file(targets_key_file+'.pub')
role_keys['snapshot']['public'] = \
repo_tool.import_ed25519_publickey_from_file(snapshot_key_file+'.pub')
role_keys['timestamp']['public'] = \
repo_tool.import_ed25519_publickey_from_file(timestamp_key_file+'.pub')
role_keys['role1']['public'] = \
repo_tool.import_ed25519_publickey_from_file(delegation_key_file+'.pub')
# Import the private keys of the top-level and delegated roles.
role_keys['root']['private'] = \
repo_tool.import_rsa_privatekey_from_file(root_key_file,
EXPECTED_KEYFILE_PASSWORD)
role_keys['targets']['private'] = \
repo_tool.import_ed25519_privatekey_from_file(targets_key_file,
EXPECTED_KEYFILE_PASSWORD)
def test_key_conversion(self):
# Import some public keys.
ed_pub_fname = os.path.join(
os.getcwd(), 'repository_data', 'keystore', 'timestamp_key.pub')
rsa_pub_fname = os.path.join(
os.getcwd(), 'repository_data', 'keystore', 'root_key.pub')
ed_pub = repo_tool.import_ed25519_publickey_from_file(ed_pub_fname)
rsa_pub = repo_tool.import_rsa_publickey_from_file(rsa_pub_fname)
# Expected DER results from converting the keys:
ed_key_expected_der = (
b'0\x81\x94\x04 \x8a\x1cJ:\xc2\xd5\x15\xde\xc9\x82\xba\x99\x10\xc5\xfdy\xb9\x1a\xe5\x7fb[\x9c\xff%\xd0k\xf0\xa6\x1c\x17X\x1a\x07ed25519\x1a\x07ed255190L0J\x1a\x06public\x1a@82ccf6ac47298ff43bfa0cd639868894e305a99c723ff0515ae2e9856eb5bbf40\x10\x1a\x06sha256\x1a\x06sha512')
rsa_key_expected_der = (
b'0\x82\x02\xdd\x04 Nw}\xe0\xd2u\xf9\xd2\x85\x88\xdd\x9a\x16\x06\xcct\x8eT\x8f\x9e"\xb6y[|\xb3\xf6?\x98\x03_\xcb\x1a\x03rsa\x1a\x11rsassa-pss-sha2560\x82\x02\x8d0\x82\x02|\x1a\x06public\x1a\x82\x02p-----BEGIN PUBLIC KEY-----\nMIIBojANBgkqhkiG9w0BAQEFAAOCAY8AMIIBigKCAYEA0GjPoVrjS9eCqzoQ8VRe\nPkC0cI6ktiEgqPfHESFzyxyjC490Cuy19nuxPcJuZfN64MC48oOkR+W2mq4pM51i\nxmdG5xjvNOBRkJ5wUCc8fDCltMUTBlqt9y5eLsf/4/EoBU+zC4SW1iPU++mCsity\nfQQ7U6LOn3EYCyrkH51hZ/dvKC4o9TPYMVxNecJ3CL1q02Q145JlyjBTuM3Xdqsa\nndTHoXSRPmmzgB/1dL/c4QjMnCowrKW06mFLq9RAYGIaJWfM/0CbrOJpVDkATmEc\nMdpGJYDfW/sRQvRdlHNPo24ZW7vkQUCqdRxvnTWkK5U81y7RtjLt1yskbWXBIbOV\nz94GXsgyzANyCT9qRjHXDDz2mkLq+9I2iKtEqaEePcWRu3H6RLahpM/TxFzw684Y\nR47weXdDecPNxWyiWiyMGStRFP4Cg9trcwAGnEm1w8R2ggmWphznCd5dXGhPNjfA\na82yNFY8ubnOUVJOf0nXGg3Edw9iY3xyjJb2+nrsk5f3AgMBAAE=\n-----END PUBLIC KEY-----0\x0b\x1a\x07private\x1a\x000\x10\x1a\x06sha256\x1a\x06sha512')
# Test by calling the helper functions directly.
self.conversion_check(
data=ed_pub,
datatype=asn1_defs.Key,
#expected_der=ed_key_expected_der,
to_asn1_func=asn1_convert._structlike_dict_to_asn1,
from_asn1_func=asn1_convert._structlike_dict_from_asn1)
self.conversion_check(
data=rsa_pub,
import unittest
import tuf
import tuf.log
import tuf.keydb
import tuf.roledb
import tuf.exceptions
import tuf.repository_tool as repo_tool
import tuf.unittest_toolbox as unittest_toolbox
import tuf.client.updater as updater
import securesystemslib
import six
logger = logging.getLogger('tuf.test_updater_root_rotation_integration')
repo_tool.disable_console_log_messages()
class TestUpdater(unittest_toolbox.Modified_TestCase):
@classmethod
def setUpClass(cls):
# setUpClass() is called before tests in an individual class are executed.
# Create a temporary directory to store the repository, metadata, and target
# files. 'temporary_directory' must be deleted in TearDownModule() so that
# temporary files are always removed, even when exceptions occur.
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
# Launch a SimpleHTTPServer (serves files in the current directory). Test
# cases will request metadata and target files that have been pre-generated
# in 'tuf/tests/repository_data', which will be served by the
# Import the top-level and delegated role public keys.
role_keys['root']['public'] = \
repo_tool.import_rsa_publickey_from_file(root_key_file+'.pub')
role_keys['targets']['public'] = \
repo_tool.import_ed25519_publickey_from_file(targets_key_file+'.pub')
role_keys['snapshot']['public'] = \
repo_tool.import_ed25519_publickey_from_file(snapshot_key_file+'.pub')
role_keys['timestamp']['public'] = \
repo_tool.import_ed25519_publickey_from_file(timestamp_key_file+'.pub')
role_keys['role1']['public'] = \
repo_tool.import_ed25519_publickey_from_file(delegation_key_file+'.pub')
# Import the private keys of the top-level and delegated roles.
role_keys['root']['private'] = \
repo_tool.import_rsa_privatekey_from_file(root_key_file,
EXPECTED_KEYFILE_PASSWORD)
role_keys['targets']['private'] = \
repo_tool.import_ed25519_privatekey_from_file(targets_key_file,
EXPECTED_KEYFILE_PASSWORD)
role_keys['snapshot']['private'] = \
repo_tool.import_ed25519_privatekey_from_file(snapshot_key_file,
EXPECTED_KEYFILE_PASSWORD)
role_keys['timestamp']['private'] = \
repo_tool.import_ed25519_privatekey_from_file(timestamp_key_file,
EXPECTED_KEYFILE_PASSWORD)
role_keys['role1']['private'] = \
repo_tool.import_ed25519_privatekey_from_file(delegation_key_file,
EXPECTED_KEYFILE_PASSWORD)
return role_keys
import unittest
import tuf.exceptions
import tuf.log
import tuf.client.updater as updater
import tuf.repository_tool as repo_tool
import tuf.unittest_toolbox as unittest_toolbox
import tuf.roledb
import tuf.keydb
import securesystemslib
import six
# The repository tool is imported and logs console messages by default.
# Disable console log messages generated by this unit test.
repo_tool.disable_console_log_messages()
logger = logging.getLogger('tuf.test_mix_and_match_attack')
class TestMixAndMatchAttack(unittest_toolbox.Modified_TestCase):
@classmethod
def setUpClass(cls):
# setUpClass() is called before any of the test cases are executed.
# Create a temporary directory to store the repository, metadata, and
# target files. 'temporary_directory' must be deleted in TearDownModule()
# so that temporary files are always removed, even when exceptions occur.
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
# Store and return the cryptography keys of the top-level roles, including 1
# delegated role.
role_keys = {}
root_key_file = os.path.join(keystore_directory, 'root_key')
targets_key_file = os.path.join(keystore_directory, 'targets_key')
snapshot_key_file = os.path.join(keystore_directory, 'snapshot_key')
timestamp_key_file = os.path.join(keystore_directory, 'timestamp_key')
delegation_key_file = os.path.join(keystore_directory, 'delegation_key')
role_keys = {'root': {}, 'targets': {}, 'snapshot': {}, 'timestamp': {},
'role1': {}}
# Import the top-level and delegated role public keys.
role_keys['root']['public'] = \
repo_tool.import_rsa_publickey_from_file(root_key_file+'.pub')
role_keys['targets']['public'] = \
repo_tool.import_ed25519_publickey_from_file(targets_key_file+'.pub')
role_keys['snapshot']['public'] = \
repo_tool.import_ed25519_publickey_from_file(snapshot_key_file+'.pub')
role_keys['timestamp']['public'] = \
repo_tool.import_ed25519_publickey_from_file(timestamp_key_file+'.pub')
role_keys['role1']['public'] = \
repo_tool.import_ed25519_publickey_from_file(delegation_key_file+'.pub')
# Import the private keys of the top-level and delegated roles.
role_keys['root']['private'] = \
repo_tool.import_rsa_privatekey_from_file(root_key_file,
EXPECTED_KEYFILE_PASSWORD)
role_keys['targets']['private'] = \
repo_tool.import_ed25519_privatekey_from_file(targets_key_file,
EXPECTED_KEYFILE_PASSWORD)
shutil.copytree(original_repository_path, repository_directory)
metadata_directory = os.path.join(repository_directory,
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
targets_directory = os.path.join(repository_directory, repo_lib.TARGETS_DIRECTORY_NAME)
root_filename = os.path.join(metadata_directory, repo_lib.ROOT_FILENAME)
targets_filename = os.path.join(metadata_directory,
repo_lib.TARGETS_FILENAME)
version = 1
expiration_date = '1985-10-21T13:20:00Z'
# Load a valid repository so that top-level roles exist in roledb and
# generate_snapshot_metadata() has roles to specify in snapshot metadata.
repository = repo_tool.Repository(repository_directory, metadata_directory,
targets_directory)
repository_junk = repo_tool.load_repository(repository_directory)
# For testing purposes, store an invalid metadata file in the metadata directory
# to verify that it isn't loaded by generate_snapshot_metadata(). Unknown
# metadata file extensions should be ignored.
invalid_metadata_file = os.path.join(metadata_directory, 'role_file.xml')
with open(invalid_metadata_file, 'w') as file_object:
file_object.write('bad extension on metadata file')
root_filename = 'root'
targets_filename = 'targets'
snapshot_metadata = \
repo_lib.generate_snapshot_metadata(metadata_directory, version,
expiration_date, root_filename,
targets_filename,
consistent_snapshot=False)
def update_json_signature(ber_signed_digest, json_signature):
keyid = json_signature['keyid']
private_key = tuf.repository_tool\
.import_ed25519_privatekey_from_file(keyid,
password='')
signature = tuf.keys.create_signature(private_key, ber_signed_digest)
# NOTE: Update the original JSON signature object!
json_signature['sig'] = signature['sig']
uptane.formats.VIN_SCHEMA.check_match(vin)
# Repository Tool expects to use the current directory.
# Figure out if this is impactful and needs to be changed.
os.chdir(self.director_repos_dir) # TODO: Is messing with cwd a bad idea?
# Generates absolute path for a subdirectory with name equal to vin,
# in the current directory, making (relatively) sure that there isn't
# anything suspect like "../" in the VIN.
# Then I strip the common prefix back off the absolute path to get a
# relative path and keep the guarantees.
# TODO: Clumsy and hacky; fix.
vin = uptane.common.scrub_filename(vin, self.director_repos_dir)
vin = os.path.relpath(vin, self.director_repos_dir)
self.vehicle_repositories[vin] = this_repo = rt.create_new_repository(
vin, repository_name=vin)
this_repo.root.add_verification_key(self.key_dirroot_pub)
this_repo.timestamp.add_verification_key(self.key_dirtime_pub)
this_repo.snapshot.add_verification_key(self.key_dirsnap_pub)
this_repo.targets.add_verification_key(self.key_dirtarg_pub)
this_repo.root.load_signing_key(self.key_dirroot_pri)
this_repo.timestamp.load_signing_key(self.key_dirtime_pri)
this_repo.snapshot.load_signing_key(self.key_dirsnap_pri)
this_repo.targets.load_signing_key(self.key_dirtarg_pri)
def load_repo():
"""
Loads the repo last written to REPO_PATH.
"""
global repo
os.chdir(ROOT_PATH)
repo = repotool.load_repository(REPO_NAME)
import_all_keys()
add_top_level_keys_to_repo()
add_delegated_keys_to_repo()
return repo