Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
try:
await chan.accept(SSHUNIXStreamSession, b'\xff')
except asyncssh.ChannelOpenError:
stdout.channel.exit(1)
elif action == 'late_auth_banner':
try:
self._conn.send_auth_banner('auth banner')
except OSError:
stdin.channel.exit(1)
elif action == 'invalid_open_confirm':
stdin.channel.send_packet(MSG_CHANNEL_OPEN_CONFIRMATION,
UInt32(0), UInt32(0), UInt32(0))
elif action == 'invalid_open_failure':
stdin.channel.send_packet(MSG_CHANNEL_OPEN_FAILURE,
UInt32(0), String(''), String(''))
elif action == 'env':
value = stdin.channel.get_environment().get('TEST', '')
stdout.write(value + '\n')
elif action == 'term':
chan = stdin.channel
info = str((chan.get_terminal_type(), chan.get_terminal_size(),
chan.get_terminal_mode(asyncssh.PTY_OP_OSPEED)))
stdout.write(info + '\n')
elif action == 'xon_xoff':
stdin.channel.set_xon_xoff(True)
elif action == 'no_xon_xoff':
stdin.channel.set_xon_xoff(False)
elif action == 'signals':
try:
await stdin.readline()
except asyncssh.BreakReceived as exc:
self.assertTrue(self.pubkey.verify(data, sig))
badsig = bytearray(sig)
badsig[-1] ^= 0xff
badsig = bytes(badsig)
with self.subTest('Bad signature'):
self.assertFalse(self.pubkey.verify(data, badsig))
with self.subTest('Missing signature'):
self.assertFalse(self.pubkey.verify(
data, String(self.pubkey.algorithm)))
with self.subTest('Empty signature'):
self.assertFalse(self.pubkey.verify(
data, String(self.pubkey.algorithm) + String(b'')))
with self.subTest('Sign with bad algorithm'):
with self.assertRaises(ValueError):
self.privkey.sign(data, 'xxx')
with self.subTest('Verify with bad algorithm'):
self.assertFalse(self.pubkey.verify(
data, String('xxx') + String('')))
with self.subTest('Sign with public key'):
with self.assertRaises(ValueError):
self.pubkey.sign(data, self.pubkey.algorithm)
with self.subTest('Certificate principal mismatch'):
cert = self.privca.generate_x509_user_certificate(
self.pubkey, 'OU=user', 'OU=root', principals=['name'])
with self.assertRaises(ValueError):
self.validate_x509(cert, 'name2')
for fmt in ('rfc4716', 'xxx'):
with self.subTest('Invalid certificate export format', fmt=fmt):
with self.assertRaises(asyncssh.KeyExportError):
self.userx509.export_certificate(fmt)
with self.subTest('Empty certificate chain'):
with self.assertRaises(asyncssh.KeyImportError):
decode_ssh_certificate(String('x509v3-ssh-rsa') +
UInt32(0) + UInt32(0))
valid_before=0xffffffffffffffff, options=None,
extensions=None, bad_signature=False):
"""Construct an SSH certificate"""
keydata = key.encode_ssh_public()
principals = b''.join((String(p) for p in principals))
options = _encode_options(options) if options else b''
extensions = _encode_options(extensions) if extensions else b''
signing_keydata = b''.join((String(signing_key.algorithm),
signing_key.encode_ssh_public()))
data = b''.join((String(cert_version), String(os.urandom(32)), keydata,
UInt64(0), UInt32(cert_type), String(key_id),
String(principals), UInt64(valid_after),
UInt64(valid_before), String(options),
String(extensions), String(''), String(signing_keydata)))
if bad_signature:
data += String('')
else:
data += String(signing_key.sign(data, signing_key.algorithm))
return b''.join((cert_version.encode('ascii'), b' ',
binascii.b2a_base64(data)))
async def test_forced_exec(self):
"""Test execution of a forced remote command"""
ckey = asyncssh.read_private_key('ckey')
cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
CERT_TYPE_USER, ckey, ckey, ['ckey'],
options={'force-command': String('echo')})
async with self.connect(username='ckey',
client_keys=[(ckey, cert)]) as conn:
await self._check_session(conn)
with self.assertRaises(asyncssh.KeyImportError):
asyncssh.import_certificate('\u0080\n')
with self.subTest('Invalid SSH format'):
with self.assertRaises(asyncssh.KeyImportError):
asyncssh.import_certificate('xxx\n')
with self.subTest('Invalid certificate packetization'):
with self.assertRaises(asyncssh.KeyImportError):
asyncssh.import_certificate(
b'xxx ' + binascii.b2a_base64(b'\x00'))
with self.subTest('Invalid certificate algorithm'):
with self.assertRaises(asyncssh.KeyImportError):
asyncssh.import_certificate(
b'xxx ' + binascii.b2a_base64(String(b'xxx')))
with self.subTest('Invalid certificate critical option'):
with self.assertRaises(asyncssh.KeyImportError):
cert = self.make_certificate(cert_type, self.pubkey,
self.privca, ('name',),
options={b'xxx': b''})
asyncssh.import_certificate(cert)
with self.subTest('Ignored certificate extension'):
cert = self.make_certificate(cert_type, self.pubkey,
self.privca, ('name',),
extensions={b'xxx': b''})
self.assertIsNotNone(asyncssh.import_certificate(cert))
with self.subTest('Invalid certificate signature'):
with self.assertRaises(asyncssh.KeyImportError):
async def _malformed_ok_response(self, pkttype, pktid, packet):
"""Send an FX_OK response containing invalid Unicode"""
# pylint: disable=unused-argument
self.send_packet(FXP_STATUS, pktid, UInt32(pktid), UInt32(FX_OK),
String(b'\xff'), String(''))
alg = b'none'
kdf = b'none'
kdf_data = b''
block_size = 8
mac = b''
pad = len(data) % block_size
if pad: # pragma: no branch
data = data + bytes(range(1, block_size + 1 - pad))
if cipher:
data, mac = cipher.encrypt_packet(0, b'', data)
else:
mac = b''
data = b''.join((_OPENSSH_KEY_V1, String(alg), String(kdf),
String(kdf_data), UInt32(nkeys),
String(self.public_data), String(data), mac))
return (b'-----BEGIN OPENSSH PRIVATE KEY-----\n' +
_wrap_base64(data, _OPENSSH_WRAP_LEN) +
b'-----END OPENSSH PRIVATE KEY-----\n')
else:
raise KeyExportError('Unknown export format')
def _send_request(self, request, *args, want_reply=False):
"""Send a channel request"""
self.send_packet(MSG_CHANNEL_REQUEST, String(request),
Boolean(want_reply), *args)
def _encode(cls, key, serial, cert_type, key_id, principals,
valid_after, valid_before, options, extensions):
"""Encode a version 01 SSH certificate"""
return b''.join((String(os.urandom(32)), key.encode_ssh_public(),
UInt64(serial), UInt32(cert_type), String(key_id),
String(principals), UInt64(valid_after),
UInt64(valid_before), String(options),
String(extensions), String('')))