Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
b'-----BEGIN OPENSSH PRIVATE KEY-----\n' +
binascii.b2a_base64(b''.join(
(b'openssh-key-v1\0', String(''), String(''), String(''),
UInt32(2), String(''), String('')))) +
b'-----END OPENSSH PRIVATE KEY-----'),
('Missing OpenSSH passphrase',
b'-----BEGIN OPENSSH PRIVATE KEY-----\n' +
binascii.b2a_base64(b''.join(
(b'openssh-key-v1\0', String('xxx'), String(''), String(''),
UInt32(1), String(''), String('')))) +
b'-----END OPENSSH PRIVATE KEY-----'),
('Mismatched OpenSSH check bytes',
b'-----BEGIN OPENSSH PRIVATE KEY-----\n' +
binascii.b2a_base64(b''.join(
(b'openssh-key-v1\0', String('none'), String(''), String(''),
UInt32(1), String(''), String(b''.join((UInt32(1),
UInt32(2))))))) +
b'-----END OPENSSH PRIVATE KEY-----'),
('Invalid OpenSSH algorithm',
b'-----BEGIN OPENSSH PRIVATE KEY-----\n' +
binascii.b2a_base64(b''.join(
(b'openssh-key-v1\0', String('none'), String(''), String(''),
UInt32(1), String(''), String(b''.join((UInt32(1), UInt32(1),
String('xxx'))))))) +
b'-----END OPENSSH PRIVATE KEY-----'),
('Invalid OpenSSH pad',
b'-----BEGIN OPENSSH PRIVATE KEY-----\n' +
binascii.b2a_base64(b''.join(
(b'openssh-key-v1\0', String('none'), String(''), String(''),
UInt32(1), String(''), String(b''.join((UInt32(1), UInt32(1),
String('ssh-dss'),
5*MPInt(0),
with self.subTest('Certificate principal mismatch'):
cert = self.privca.generate_x509_user_certificate(
self.pubkey, 'OU=user', 'OU=root', principals=['name'])
with self.assertRaises(ValueError):
self.validate_x509(cert, 'name2')
for fmt in ('rfc4716', 'xxx'):
with self.subTest('Invalid certificate export format', fmt=fmt):
with self.assertRaises(asyncssh.KeyExportError):
self.userx509.export_certificate(fmt)
with self.subTest('Empty certificate chain'):
with self.assertRaises(asyncssh.KeyImportError):
decode_ssh_certificate(String('x509v3-ssh-rsa') +
UInt32(0) + UInt32(0))
The height of the terminal in pixels
:type width: `int`
:type height: `int`
:type pixwidth: `int`
:type pixheight: `int`
"""
if pixwidth or pixheight:
self.logger.info('Sending window size change: %sx%s (%sx%s pixels)',
width, height, pixwidth, pixheight)
else:
self.logger.info('Sending window size change: %sx%s', width, height)
self._send_request(b'window-change', UInt32(width), UInt32(height),
UInt32(pixwidth), UInt32(pixheight))
def _process_token(self, token=None):
"""Process a GSS token"""
try:
self._token = self._gss.step(token)
except GSSError as exc:
if self._conn.is_server():
self.send_packet(MSG_KEXGSS_ERROR, UInt32(exc.maj_code),
UInt32(exc.min_code), String(str(exc)),
String(DEFAULT_LANG))
if exc.token:
self.send_packet(MSG_KEXGSS_CONTINUE, String(exc.token))
raise KeyExchangeFailed(str(exc))
raise ValueError('If set, terminal size must be a tuple of '
'2 or 4 integers')
modes = b''
for mode, value in term_modes.items():
if mode <= PTY_OP_END or mode >= PTY_OP_RESERVED:
raise ValueError('Invalid pty mode: %s' % mode)
name = _pty_mode_names.get(mode, str(mode))
self.logger.debug2(' Mode %s: %s', name, value)
modes += Byte(mode) + UInt32(value)
modes += Byte(PTY_OP_END)
if not (await self._make_request(b'pty-req', String(term_type),
UInt32(width), UInt32(height),
UInt32(pixwidth),
UInt32(pixheight),
String(modes))):
self.close()
raise ChannelOpenError(OPEN_REQUEST_PTY_FAILED,
'PTY request failed')
if x11_forwarding:
self.logger.debug1(' X11 forwarding enabled')
try:
auth_proto, remote_auth, screen = \
await self._conn.attach_x11_listener(
self, x11_display, x11_auth_path, x11_single_connection)
except ValueError as exc:
raise ChannelOpenError(OPEN_REQUEST_X11_FORWARDING_FAILED,
def _process_token(self, token=None):
"""Process a GSS token"""
try:
self._token = self._gss.step(token)
except GSSError as exc:
if self._conn.is_server():
self.send_packet(MSG_KEXGSS_ERROR, UInt32(exc.maj_code),
UInt32(exc.min_code), String(str(exc)),
String(DEFAULT_LANG))
if exc.token:
self.send_packet(MSG_KEXGSS_CONTINUE, String(exc.token))
raise KeyExchangeFailed(str(exc))
async def _open(self, chantype, *args):
"""Make a request to open the channel"""
if self._send_state != 'closed':
raise OSError('Channel already open')
self._open_waiter = self._loop.create_future()
self.logger.debug2(' Initial recv window %d, packet size %d',
self._recv_window, self._recv_pktsize)
self._conn.send_packet(MSG_CHANNEL_OPEN, String(chantype),
UInt32(self._recv_chan),
UInt32(self._recv_window),
UInt32(self._recv_pktsize), *args, handler=self)
return await self._open_waiter
def _send_challenge(self, challenge):
"""Send a keyboard interactive authentication request"""
if isinstance(challenge, (tuple, list)):
name, instruction, lang, prompts = challenge
num_prompts = len(prompts)
prompts = (String(prompt) + Boolean(echo)
for prompt, echo in prompts)
self.send_packet(MSG_USERAUTH_INFO_REQUEST, String(name),
String(instruction), String(lang),
UInt32(num_prompts), *prompts)
elif challenge:
self.send_success()
else:
self.send_failure()