Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_dh_errors(self):
"""Unit test error conditions in DH key exchange"""
client_conn, server_conn = \
_KexClientStub.make_pair(b'diffie-hellman-group14-sha1')
host_key = server_conn.get_server_host_key()
with self.subTest('Init sent to client'):
with self.assertRaises(asyncssh.ProtocolError):
client_conn.process_packet(Byte(MSG_KEXDH_INIT))
with self.subTest('Reply sent to server'):
with self.assertRaises(asyncssh.ProtocolError):
server_conn.process_packet(Byte(MSG_KEXDH_REPLY))
with self.subTest('Invalid e value'):
with self.assertRaises(asyncssh.ProtocolError):
server_conn.simulate_dh_init(0)
with self.subTest('Invalid f value'):
with self.assertRaises(asyncssh.ProtocolError):
client_conn.start()
client_conn.simulate_dh_reply(host_key.public_data, 0, b'')
with self.subTest('Invalid signature'):
with self.assertRaises(asyncssh.KeyExchangeFailed):
def simulate_dh_gex_init(self, e):
"""Simulate receiving a DH GEX init packet"""
self.process_packet(Byte(MSG_KEX_DH_GEX_INIT) + MPInt(e))
async def test_dh_errors(self):
"""Unit test error conditions in DH key exchange"""
client_conn, server_conn = \
_KexClientStub.make_pair(b'diffie-hellman-group14-sha1')
host_key = server_conn.get_server_host_key()
with self.subTest('Init sent to client'):
with self.assertRaises(asyncssh.ProtocolError):
client_conn.process_packet(Byte(MSG_KEXDH_INIT))
with self.subTest('Reply sent to server'):
with self.assertRaises(asyncssh.ProtocolError):
server_conn.process_packet(Byte(MSG_KEXDH_REPLY))
with self.subTest('Invalid e value'):
with self.assertRaises(asyncssh.ProtocolError):
server_conn.simulate_dh_init(0)
with self.subTest('Invalid f value'):
with self.assertRaises(asyncssh.ProtocolError):
client_conn.start()
client_conn.simulate_dh_reply(host_key.public_data, 0, b'')
with self.subTest('Invalid signature'):
with self.assertRaises(asyncssh.KeyExchangeFailed):
client_conn.start()
client_conn.simulate_dh_reply(host_key.public_data, 1, b'')
client_conn.close()
def simulate_ecdh_reply(self, host_key_data, server_pub, sig):
"""Simulate receiving ab ECDH reply packet"""
self.process_packet(b''.join((Byte(MSG_KEX_ECDH_REPLY),
String(host_key_data),
String(server_pub), String(sig))))
def simulate_gss_complete(self, f, sig):
"""Simulate receiving a GSS complete packet"""
self.process_packet(b''.join((Byte(MSG_KEXGSS_COMPLETE), MPInt(f),
String(sig), Boolean(False))))
def _send_request(self, request, *args, want_reply=False):
"""Send a channel request"""
if request == b'env' and args[1] == String('invalid'):
args = args[:1] + (String(b'\xff'),)
elif request == b'pty-req':
if args[5][-6:-5] == Byte(PTY_OP_PARTIAL):
args = args[:5] + (String(args[5][4:-5]),)
elif args[5][-6:-5] == Byte(PTY_OP_NO_END):
args = args[:5] + (String(args[5][4:-6]),)
super()._send_request(request, *args, want_reply=want_reply)
if exc.errno in (errno.ENOENT, errno.ENOTDIR):
code = FX_NO_SUCH_FILE
elif exc.errno == errno.EACCES:
code = FX_PERMISSION_DENIED
else:
code = FX_FAILURE
result = (UInt32(code) + String(exc.strerror or str(exc)) +
String(DEFAULT_LANG))
except Exception as exc: # pragma: no cover
return_type = FXP_STATUS
result = (UInt32(FX_FAILURE) +
String('Uncaught exception: %s' % str(exc)) +
String(DEFAULT_LANG))
self.send_packet(Byte(return_type), UInt32(pktid), result)
def _send_request(self, pkttype, *args, waiter=None):
"""Send an SFTP request"""
if not self._writer:
raise SFTPError(FX_NO_CONNECTION, 'Connection not open')
pktid = self._next_pktid
self._next_pktid = (self._next_pktid + 1) & 0xffffffff
self._requests[pktid] = waiter
if isinstance(pkttype, bytes):
hdr = Byte(FXP_EXTENDED) + UInt32(pktid) + String(pkttype)
else:
hdr = Byte(pkttype) + UInt32(pktid)
self.send_packet(hdr, *args)
def _send_request(self, pkttype, *args, waiter=None):
"""Send an SFTP request"""
if not self._writer:
raise SFTPError(FX_NO_CONNECTION, 'Connection not open')
pktid = self._next_pktid
self._next_pktid = (self._next_pktid + 1) & 0xffffffff
self._requests[pktid] = waiter
if isinstance(pkttype, bytes):
hdr = Byte(FXP_EXTENDED) + UInt32(pktid) + String(pkttype)
else:
hdr = Byte(pkttype) + UInt32(pktid)
self.send_packet(hdr, *args)
async def sign(self, data):
"""Use ssh-keysign to sign a block of data with this key"""
proc = await asyncio.create_subprocess_exec(
self._keysign_path, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, pass_fds=[self._sock_fd])
request = String(Byte(KEYSIGN_VERSION) + UInt32(self._sock_fd) +
String(data))
stdout, stderr = await proc.communicate(request)
if stderr:
error = stderr.decode().strip()
raise ValueError(error)
try:
packet = SSHPacket(stdout)
resp = packet.get_string()
packet.check_end()
packet = SSHPacket(resp)
version = packet.get_byte()
sig = packet.get_string()
packet.check_end()