Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ec_explicit(self):
"""Test EC certificate with explcit parameters"""
if _openssl_available: # pragma: no branch
for curve in ('secp256r1', 'secp384r1', 'secp521r1'):
with self.subTest('Import EC key with explicit parameters',
curve=curve):
run('openssl ecparam -out priv -noout -genkey -name %s '
'-param_enc explicit' % curve)
asyncssh.read_private_key('priv')
async def test_disallowed_address(self):
"""Test disallowed address in certificate"""
ckey = asyncssh.read_private_key('ckey')
skey = asyncssh.read_private_key('skey')
cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
CERT_TYPE_USER, skey, ckey, ['ckey'],
options={'source-address': String('0.0.0.0')})
with self.assertRaises(asyncssh.PermissionDenied):
await self.connect(username='ckey', client_keys=[(skey, cert)])
def check_private(self, format_name, passphrase=None):
"""Check for a private key match"""
newkey = asyncssh.read_private_key('new', passphrase)
algorithm = newkey.get_algorithm()
keydata = newkey.export_private_key()
pubdata = newkey.public_data
self.assertEqual(newkey, self.privkey)
self.assertEqual(hash(newkey), hash(self.privkey))
keypair = asyncssh.load_keypairs(newkey, passphrase)[0]
self.assertEqual(keypair.get_key_type(), 'local')
self.assertEqual(keypair.get_algorithm(), algorithm)
self.assertEqual(keypair.public_data, pubdata)
self.assertIsNotNone(keypair.get_agent_private_key())
keypair = asyncssh.load_keypairs([keypair])[0]
self.assertEqual(keypair.public_data, pubdata)
"""Check export of a PKCS#8 private key"""
format_name = 'pkcs8-%s' % fmt
self.privkey.write_private_key('privout', format_name,
select_passphrase(cipher, pbe_version),
cipher, hash_alg, pbe_version)
if self.use_openssl and openssl_ok: # pragma: no branch
if cipher:
run('openssl pkcs8 -in privout -inform %s -out new '
'-outform pem -passin pass:passphrase' % fmt)
else:
run('openssl pkcs8 -nocrypt -in privout -inform %s -out new '
'-outform pem' % fmt)
else: # pragma: no cover
priv = asyncssh.read_private_key('privout',
select_passphrase(cipher,
pbe_version))
priv.write_private_key('new', format_name)
self.check_private(format_name)
async def test_allowed_address(self):
"""Test allowed address in certificate"""
ckey = asyncssh.read_private_key('ckey')
skey = asyncssh.read_private_key('skey')
cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
CERT_TYPE_USER, skey, ckey, ['ckey'],
options={'source-address':
String('0.0.0.0/0,::/0')})
async with self.connect(username='ckey', client_keys=[(skey, cert)]):
pass
async def test_forwarding_not_allowed(self):
"""Test an X11 request from a non-authorized user"""
ckey = asyncssh.read_private_key('ckey')
cert = ckey.generate_user_certificate(ckey, 'name', principals=['ckey'],
permit_x11_forwarding=False)
async with self.connect(username='ckey',
client_keys=[(ckey, cert)]) as conn:
with self.assertRaises(asyncssh.ChannelOpenError):
await _create_x11_process(conn, 'connect l')
async def test_cert_principals(self):
"""Test certificate principals check"""
ckey = asyncssh.read_private_key('ckey')
cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
CERT_TYPE_USER, ckey, ckey, ['ckey'])
async with self.connect(username='ckey', client_keys=[(ckey, cert)]):
pass
async def setup(spawner):
username = spawner.user.name
remote_host = "corijupyter.nersc.gov"
keyfile = "/certs/{username}.key".format(username=username) # NEED to have in NERSCSpawner now
certfile = keyfile + "-cert.pub"
k = asyncssh.read_private_key(keyfile)
c = asyncssh.read_certificate(certfile)
try:
async with asyncssh.connect(remote_host, username=username,
client_keys=[(k,c)], known_hosts=None) as conn:
result = await conn.run("myquota -c $HOME")
retcode = result.exit_status
except:
spawner.log.warning(f"Problem connecting to {remote_host} to check quota oh well")
retcode = 0
if retcode:
from jinja2 import Markup
e = web.HTTPError(507, reason="Insufficient Storage")
e.jupyterhub_message = Markup("<br>Your home directory is over quota! " +
"Try moving or archiving and deleting some files from there, " +
"then come back and try again.<br>")
raise e
def client_keys(self):
private_key = asyncssh.read_private_key(self.private_key_path)
if self.certificate_path:
certificate = asyncssh.read_certificate(self.certificate_path)
return [(private_key, certificate)]
else:
return private_key