Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_server(event_loop):
server = yield from start_console_server(
host="127.0.0.1", port=0, banner='test')
address = server.sockets[0].getsockname()
stream = io.StringIO()
print_server(server, "test console", file=stream)
expected = "The test console is being served on 127.0.0.1:{}\n"
assert stream.getvalue() == expected.format(address[1])
reader, writer = yield from asyncio.open_connection(*address)
assert (yield from reader.readline()) == b'test\n'
writer.write(b'1+1\n')
assert (yield from reader.readline()) == b'>>> 2\n'
writer.write_eof()
assert (yield from reader.readline()) == b'>>> \n'
writer.close()
if compat.PY37:
yield from writer.wait_closed()
server.close()
yield from server.wait_closed()
def test_uds_server(event_loop, tmpdir):
path = str(tmpdir / "test.uds")
server = yield from start_console_server(path=path, banner='test')
stream = io.StringIO()
print_server(server, "test console", file=stream)
expected = "The test console is being served on {}\n"
assert stream.getvalue() == expected.format(path)
address = server.sockets[0].getsockname()
reader, writer = yield from asyncio.open_unix_connection(address)
assert (yield from reader.readline()) == b'test\n'
writer.write(b'1+1\n')
assert (yield from reader.readline()) == b'>>> 2\n'
writer.write_eof()
assert (yield from reader.readline()) == b'>>> \n'
writer.close()
if compat.PY37:
yield from writer.wait_closed()
server.close()
yield from server.wait_closed()
if serve is None:
self.console = self.factory(None)
coro = self.console.interact(
banner, stop=True, handle_sigint=True)
self.console_task = asyncio.ensure_future(coro, loop=self)
# Serving console
else:
host, port = serve
coro = server.start_interactive_server(
self.factory,
host=host,
port=port,
banner=banner,
loop=self)
self.console_server = self.run_until_complete(coro)
server.print_server(self.console_server)
def main(args=None):
host, port, serve_cli = parse_args(args)
if serve_cli:
cli_host, cli_port = serve_cli
coro = start_interactive_server(make_cli, cli_host, cli_port)
server = asyncio.get_event_loop().run_until_complete(coro)
print_server(server, 'command line interface')
else:
asyncio.ensure_future(make_cli().interact())
return echo.run(host, port)