Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def start_ssh_server(q):
global server_port,server_q
listener = await asyncssh.create_server(MySSHServer,'127.0.0.1',server_port,server_host_keys=['tests'+os.path.sep+'ssh_host_key'],process_factory=handle_client,sftp_factory=True,allow_scp=True)
for item in listener.sockets:
server_q.put(item.getsockname()[1])
break
:return:
'''
await asyncssh.listen(host='',
port=sftp_port,
authorized_client_keys=authorized_client_keys,
server_host_keys=[server_host_keys],
sftp_factory=sftp_factory,
**kwargs)
# 'reuse_address' probably also has to be `True` in this case
if kwargs.get('reuse_port') and not ssh_port:
ssh_port = sftp_port
# Don't create the ssh server unless there is a port available for it
if ssh_port:
await asyncssh.create_server(
host='',
server_factory=server_factory,
port=ssh_port,
authorized_client_keys=authorized_client_keys,
server_host_keys=[server_host_keys],
)
async def test_no_server_host_keys(self):
"""Test starting a server with no host keys"""
with self.assertRaises(ValueError):
await asyncssh.create_server(Server, server_host_keys=[],
gss_host=None)
Example that starts the REPL through an SSH server.
"""
loop = asyncio.get_event_loop()
# Namespace exposed in the REPL.
environ = {'hello': 'world'}
# Start SSH server.
def create_server():
return MySSHServer(lambda: environ)
print('Listening on :%i' % port)
print('To connect, do "ssh localhost -p %i"' % port)
loop.run_until_complete(
asyncssh.create_server(create_server, '', port,
server_host_keys=['/etc/ssh/ssh_host_dsa_key']))
# Run eventloop.
loop.run_forever()
def main(port=8222):
# Set up logging.
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
loop = asyncio.get_event_loop()
loop.run_until_complete(
asyncssh.create_server(
lambda: PromptToolkitSSHServer(interact),
"",
port,
server_host_keys=["/etc/ssh/ssh_host_ecdsa_key"],
)
)
loop.run_forever()
async def interact():
nonlocal uis, permenantly_closed
if permenantly_closed:
return
ui = FullNodeUI(store, blockchain, server, port, ui_close_cb)
assert ui.app
uis.append(ui)
try:
await ui.app.run_async()
except ConnectionError:
log.info("Connection error in ssh UI, exiting.")
ui.close()
raise
ssh_server = await asyncssh.create_server(
lambda: PromptToolkitSSHServer(interact),
"",
ssh_port,
server_host_keys=[ssh_key_filename],
reuse_address=True,
)
return await_all_closed, ui_close_cb
async def start_server():
await asyncssh.create_server(MySSHServer, '', 8022,
server_host_keys=['ssh_host_key'],
authorized_client_keys='ssh_user_ca')