Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
keydata = newkey.export_private_key()
pubdata = newkey.public_data
self.assertEqual(newkey, self.privkey)
self.assertEqual(hash(newkey), hash(self.privkey))
keypair = asyncssh.load_keypairs(newkey, passphrase)[0]
self.assertEqual(keypair.get_key_type(), 'local')
self.assertEqual(keypair.get_algorithm(), algorithm)
self.assertEqual(keypair.public_data, pubdata)
self.assertIsNotNone(keypair.get_agent_private_key())
keypair = asyncssh.load_keypairs([keypair])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs(keydata)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs('new', passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([newkey])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([(newkey, None)])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([keydata])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([(keydata, None)])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([(keydata, None)])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs(['new'], passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([('new', None)], passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs(Path('new'), passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([Path('new')], passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([(Path('new'), None)], passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
keylist = asyncssh.load_keypairs([])
self.assertEqual(keylist, [])
if passphrase:
with self.assertRaises((asyncssh.KeyEncryptionError,
asyncssh.KeyImportError)):
asyncssh.load_keypairs('new', 'xxx')
else:
newkey.write_private_key('list', format_name)
newkey.append_private_key('list', format_name)
keylist = asyncssh.load_keypairs('list')
self.assertEqual(keylist[0].public_data, pubdata)
self.assertEqual(keylist[1].public_data, pubdata)
keypair = asyncssh.load_keypairs(['new'], passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([('new', None)], passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs(Path('new'), passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([Path('new')], passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([(Path('new'), None)], passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
keylist = asyncssh.load_keypairs([])
self.assertEqual(keylist, [])
if passphrase:
with self.assertRaises((asyncssh.KeyEncryptionError,
asyncssh.KeyImportError)):
asyncssh.load_keypairs('new', 'xxx')
else:
newkey.write_private_key('list', format_name)
newkey.append_private_key('list', format_name)
keylist = asyncssh.load_keypairs('list')
self.assertEqual(keylist[0].public_data, pubdata)
self.assertEqual(keylist[1].public_data, pubdata)
newkey.write_private_key(Path('list'), format_name)
newkey.append_private_key(Path('list'), format_name)
async def test_client_key_keypairs(self):
"""Test client keys passed in as a list of SSHKeyPairs"""
keys = asyncssh.load_keypairs('ckey')
async with self.connect(username='ckey', client_keys=keys):
pass
self.assertEqual(newkey, self.privkey)
self.assertEqual(hash(newkey), hash(self.privkey))
keypair = asyncssh.load_keypairs(newkey, passphrase)[0]
self.assertEqual(keypair.get_key_type(), 'local')
self.assertEqual(keypair.get_algorithm(), algorithm)
self.assertEqual(keypair.public_data, pubdata)
self.assertIsNotNone(keypair.get_agent_private_key())
keypair = asyncssh.load_keypairs([keypair])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs(keydata)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs('new', passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([newkey])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([(newkey, None)])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([keydata])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([(keydata, None)])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs(['new'], passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
newkey = asyncssh.read_private_key('new', passphrase)
algorithm = newkey.get_algorithm()
keydata = newkey.export_private_key()
pubdata = newkey.public_data
self.assertEqual(newkey, self.privkey)
self.assertEqual(hash(newkey), hash(self.privkey))
keypair = asyncssh.load_keypairs(newkey, passphrase)[0]
self.assertEqual(keypair.get_key_type(), 'local')
self.assertEqual(keypair.get_algorithm(), algorithm)
self.assertEqual(keypair.public_data, pubdata)
self.assertIsNotNone(keypair.get_agent_private_key())
keypair = asyncssh.load_keypairs([keypair])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs(keydata)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs('new', passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([newkey])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([(newkey, None)])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([keydata])[0]
self.assertEqual(keypair.public_data, pubdata)
chain = SSHX509CertificateChain.construct_from_certs([cert])
cert.write_certificate('new_cert')
keypair = asyncssh.load_keypairs(('new', 'new_cert'), passphrase)[0]
self.assertEqual(keypair.public_data, chain.public_data)
self.assertIsNotNone(keypair.get_agent_private_key())
newkey.write_private_key('new_bundle', format_name, passphrase)
cert.append_certificate('new_bundle', 'pem')
keypair = asyncssh.load_keypairs('new_bundle', passphrase)[0]
self.assertEqual(keypair.public_data, chain.public_data)
with self.assertRaises(OSError):
asyncssh.load_keypairs(('new', 'not_found'), passphrase)
def check_private(self, format_name, passphrase=None):
"""Check for a private key match"""
newkey = asyncssh.read_private_key('new', passphrase)
algorithm = newkey.get_algorithm()
keydata = newkey.export_private_key()
pubdata = newkey.public_data
self.assertEqual(newkey, self.privkey)
self.assertEqual(hash(newkey), hash(self.privkey))
keypair = asyncssh.load_keypairs(newkey, passphrase)[0]
self.assertEqual(keypair.get_key_type(), 'local')
self.assertEqual(keypair.get_algorithm(), algorithm)
self.assertEqual(keypair.public_data, pubdata)
self.assertIsNotNone(keypair.get_agent_private_key())
keypair = asyncssh.load_keypairs([keypair])[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs(keydata)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs('new', passphrase)[0]
self.assertEqual(keypair.public_data, pubdata)
keypair = asyncssh.load_keypairs([newkey])[0]
self.assertEqual(keypair.public_data, pubdata)