Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def start_management_console(local_vars: Dict = locals(),
host: str = "localhost",
port: int = 8211,
banner: str = "hummingbot") -> asyncio.base_events.Server:
def factory_method(*args, **kwargs):
from aioconsole.code import AsynchronousConsole
return AsynchronousConsole(locals=local_vars, *args, **kwargs)
retval = await aioconsole.start_interactive_server(host=host, port=port, banner=banner,
factory=factory_method)
logging.getLogger(__name__).info(f"Started debug console at {host}:{port}.")
return retval
def init_console_server(host: str,
port: int,
locals: OptLocals,
loop: Loop) -> 'Future[Server]':
def _factory(streams: Any = None) -> aioconsole.AsynchronousConsole:
return aioconsole.AsynchronousConsole(
locals=locals, streams=streams, loop=loop)
coro = aioconsole.start_interactive_server(
host=host, port=port, factory=_factory, loop=loop)
console_future = asyncio.run_coroutine_threadsafe(coro, loop=loop)
return console_future
def main(args=None):
host, port, serve_cli = parse_args(args)
if serve_cli:
cli_host, cli_port = serve_cli
coro = start_interactive_server(make_cli, cli_host, cli_port)
server = asyncio.get_event_loop().run_until_complete(coro)
print_server(server, 'command line interface')
else:
asyncio.ensure_future(make_cli().interact())
return echo.run(host, port)