Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# pylint: disable=too-many-statements
ckey = asyncssh.generate_private_key('ssh-rsa')
ckey.write_private_key('ckey')
ckey.write_private_key('ckey_encrypted', passphrase='passphrase')
ckey.write_public_key('ckey.pub')
ckey_ecdsa = asyncssh.generate_private_key('ecdsa-sha2-nistp256')
ckey_ecdsa.write_private_key('ckey_ecdsa')
ckey_ecdsa.write_public_key('ckey_ecdsa.pub')
ckey_cert = ckey.generate_user_certificate(ckey, 'name',
principals=['ckey'])
ckey_cert.write_certificate('ckey-cert.pub')
skey = asyncssh.generate_private_key('ssh-rsa')
skey.write_private_key('skey')
skey.write_public_key('skey.pub')
skey_ecdsa = asyncssh.generate_private_key('ecdsa-sha2-nistp256')
skey_ecdsa.write_private_key('skey_ecdsa')
skey_ecdsa.write_public_key('skey_ecdsa.pub')
skey_cert = skey.generate_host_certificate(skey, 'name',
principals=['127.0.0.1'])
skey_cert.write_certificate('skey-cert.pub')
exp_cert = skey.generate_host_certificate(skey, 'name',
valid_after='-2d',
valid_before='-1d')
skey.write_private_key('exp_skey')
exp_cert.write_certificate('exp_skey-cert.pub')
def test_non_root_ca(self):
"""Test error on non-root X.509 CA"""
key = asyncssh.generate_private_key('ssh-rsa')
cert = key.generate_x509_user_certificate(key, 'CN=a', 'CN=b')
data = 'cert-authority ' + cert.export_certificate().decode('ascii')
with self.assertRaises(ValueError):
asyncssh.import_authorized_keys(data)
def test_keys(self):
"""Check keys and certificates"""
for alg_name, kwargs in self.generate_args:
with self.subTest(alg_name=alg_name, **kwargs):
self.privkey = asyncssh.generate_private_key(
alg_name, comment='comment', **kwargs)
self.privkey.write_private_key('priv', self.base_format)
self.pubkey = self.privkey.convert_to_public()
self.pubkey.write_public_key('pub', self.base_format)
self.pubkey.write_public_key('sshpub', 'openssh')
self.privca = asyncssh.generate_private_key(alg_name, **kwargs)
self.privca.write_private_key('privca', self.base_format)
self.pubca = self.privca.convert_to_public()
self.pubca.write_public_key('pubca', self.base_format)
self.usercert = self.privca.generate_user_certificate(
self.pubkey, 'name')
self.usercert.write_certificate('usercert')
if x509_available: # pragma: no branch
ckey_x509_self = ckey_ecdsa.generate_x509_user_certificate(
ckey_ecdsa, 'OU=name', principals=['ckey'])
ckey_ecdsa.write_private_key('ckey_x509_self')
ckey_x509_self.append_certificate('ckey_x509_self', 'pem')
ckey_x509_self.write_certificate('ckey_x509_self.pem', 'pem')
ckey_x509_self.write_certificate('ckey_x509_self.pub')
skey_x509_self = skey_ecdsa.generate_x509_host_certificate(
skey_ecdsa, 'OU=name', principals=['127.0.0.1'])
skey_ecdsa.write_private_key('skey_x509_self')
skey_x509_self.append_certificate('skey_x509_self', 'pem')
skey_x509_self.write_certificate('skey_x509_self.pem', 'pem')
root_ca_key = asyncssh.generate_private_key('ssh-rsa')
root_ca_key.write_private_key('root_ca_key')
root_ca_cert = root_ca_key.generate_x509_ca_certificate(
root_ca_key, 'OU=RootCA', ca_path_len=1)
root_ca_cert.write_certificate('root_ca_cert.pem', 'pem')
root_ca_cert.write_certificate('root_ca_cert.pub')
int_ca_key = asyncssh.generate_private_key('ssh-rsa')
int_ca_key.write_private_key('int_ca_key')
int_ca_cert = root_ca_key.generate_x509_ca_certificate(
int_ca_key, 'OU=IntCA', 'OU=RootCA', ca_path_len=0)
int_ca_cert.write_certificate('int_ca_cert.pem', 'pem')
ckey_x509_chain = int_ca_key.generate_x509_user_certificate(
ckey, 'OU=name', 'OU=IntCA', principals=['ckey'])
async def test_add_remove_keys(self, agent):
"""Test adding and removing keys"""
await agent.add_keys()
agent_keys = await agent.get_keys()
self.assertEqual(len(agent_keys), 0)
key = asyncssh.generate_private_key('ssh-rsa')
await agent.add_keys([key])
agent_keys = await agent.get_keys()
self.assertEqual(len(agent_keys), 1)
await agent.remove_keys(agent_keys)
agent_keys = await agent.get_keys()
self.assertEqual(len(agent_keys), 0)
await agent.add_keys([key])
agent_keys = await agent.get_keys()
self.assertEqual(len(agent_keys), 1)
await agent_keys[0].remove()
agent_keys = await agent.get_keys()
self.assertEqual(len(agent_keys), 0)
async def asyncSetUpClass(cls):
"""Set up keys, an SSH server, and an SSH agent for the tests to use"""
# pylint: disable=too-many-statements
ckey = asyncssh.generate_private_key('ssh-rsa')
ckey.write_private_key('ckey')
ckey.write_private_key('ckey_encrypted', passphrase='passphrase')
ckey.write_public_key('ckey.pub')
ckey_ecdsa = asyncssh.generate_private_key('ecdsa-sha2-nistp256')
ckey_ecdsa.write_private_key('ckey_ecdsa')
ckey_ecdsa.write_public_key('ckey_ecdsa.pub')
ckey_cert = ckey.generate_user_certificate(ckey, 'name',
principals=['ckey'])
ckey_cert.write_certificate('ckey-cert.pub')
skey = asyncssh.generate_private_key('ssh-rsa')
skey.write_private_key('skey')
skey.write_public_key('skey.pub')
def test_generate_errors(self):
"""Test errors in private key and certificate generation"""
for alg_name, kwargs in (('xxx', {}),
('ssh-dss', {'xxx': 0}),
('ssh-rsa', {'xxx': 0}),
('ecdsa-sha2-nistp256', {'xxx': 0}),
('ssh-ed25519', {'xxx': 0}),
('ssh-ed448', {'xxx': 0})):
with self.subTest(alg_name=alg_name, **kwargs):
with self.assertRaises(asyncssh.KeyGenerationError):
asyncssh.generate_private_key(alg_name, **kwargs)
privkey = asyncssh.generate_private_key('ssh-rsa')
pubkey = privkey.convert_to_public()
privca = asyncssh.generate_private_key('ssh-rsa')
with self.assertRaises(asyncssh.KeyGenerationError):
privca.generate_user_certificate(pubkey, 'name', version=0)
with self.assertRaises(ValueError):
privca.generate_user_certificate(pubkey, 'name', valid_after=())
with self.assertRaises(ValueError):
privca.generate_user_certificate(pubkey, 'name', valid_after='xxx')
with self.assertRaises(ValueError):
privca.generate_user_certificate(pubkey, 'name', valid_after='now',
def test_generate_errors(self):
"""Test errors in private key and certificate generation"""
for alg_name, kwargs in (('xxx', {}),
('ssh-dss', {'xxx': 0}),
('ssh-rsa', {'xxx': 0}),
('ecdsa-sha2-nistp256', {'xxx': 0}),
('ssh-ed25519', {'xxx': 0}),
('ssh-ed448', {'xxx': 0})):
with self.subTest(alg_name=alg_name, **kwargs):
with self.assertRaises(asyncssh.KeyGenerationError):
asyncssh.generate_private_key(alg_name, **kwargs)
privkey = asyncssh.generate_private_key('ssh-rsa')
pubkey = privkey.convert_to_public()
privca = asyncssh.generate_private_key('ssh-rsa')
with self.assertRaises(asyncssh.KeyGenerationError):
privca.generate_user_certificate(pubkey, 'name', version=0)
with self.assertRaises(ValueError):
privca.generate_user_certificate(pubkey, 'name', valid_after=())
with self.assertRaises(ValueError):
privca.generate_user_certificate(pubkey, 'name', valid_after='xxx')
with self.assertRaises(ValueError):
privca.generate_user_certificate(pubkey, 'name', valid_after='now',
valid_before='-1m')
async def asyncSetUpClass(cls):
"""Set up keys, an SSH server, and an SSH agent for the tests to use"""
# pylint: disable=too-many-statements
ckey = asyncssh.generate_private_key('ssh-rsa')
ckey.write_private_key('ckey')
ckey.write_private_key('ckey_encrypted', passphrase='passphrase')
ckey.write_public_key('ckey.pub')
ckey_ecdsa = asyncssh.generate_private_key('ecdsa-sha2-nistp256')
ckey_ecdsa.write_private_key('ckey_ecdsa')
ckey_ecdsa.write_public_key('ckey_ecdsa.pub')
ckey_cert = ckey.generate_user_certificate(ckey, 'name',
principals=['ckey'])
ckey_cert.write_certificate('ckey-cert.pub')
skey = asyncssh.generate_private_key('ssh-rsa')
skey.write_private_key('skey')
skey.write_public_key('skey.pub')
skey_ecdsa = asyncssh.generate_private_key('ecdsa-sha2-nistp256')
skey_ecdsa.write_private_key('skey_ecdsa')
skey_ecdsa.write_public_key('skey_ecdsa.pub')
skey_cert = skey.generate_host_certificate(skey, 'name',
def test_generate_errors(self):
"""Test errors in private key and certificate generation"""
for alg_name, kwargs in (('xxx', {}),
('ssh-dss', {'xxx': 0}),
('ssh-rsa', {'xxx': 0}),
('ecdsa-sha2-nistp256', {'xxx': 0}),
('ssh-ed25519', {'xxx': 0}),
('ssh-ed448', {'xxx': 0})):
with self.subTest(alg_name=alg_name, **kwargs):
with self.assertRaises(asyncssh.KeyGenerationError):
asyncssh.generate_private_key(alg_name, **kwargs)
privkey = asyncssh.generate_private_key('ssh-rsa')
pubkey = privkey.convert_to_public()
privca = asyncssh.generate_private_key('ssh-rsa')
with self.assertRaises(asyncssh.KeyGenerationError):
privca.generate_user_certificate(pubkey, 'name', version=0)
with self.assertRaises(ValueError):
privca.generate_user_certificate(pubkey, 'name', valid_after=())
with self.assertRaises(ValueError):
privca.generate_user_certificate(pubkey, 'name', valid_after='xxx')
with self.assertRaises(ValueError):
privca.generate_user_certificate(pubkey, 'name', valid_after='now',
valid_before='-1m')
with self.assertRaises(ValueError):
privca.generate_x509_user_certificate(pubkey, 'OU=user',