Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""Provide an interactive event loop class."""
import asyncio
import functools
from . import code
from . import compat
from . import server
class InteractiveEventLoop(asyncio.SelectorEventLoop):
"""Event loop running a python console."""
console_class = code.AsynchronousConsole
def __init__(self, *, selector=None, locals=None, banner=None, serve=None,
prompt_control=None):
self.console = None
self.console_task = None
self.console_server = None
super().__init__(selector=selector)
# Factory
self.factory = lambda streams: self.console_class(
streams, locals=locals, prompt_control=prompt_control, loop=self)
# Local console
if serve is None:
self.console = self.factory(None)
coro = self.console.interact(
banner, stop=True, handle_sigint=True)
self.console_task = asyncio.ensure_future(coro, loop=self)
"""Provide an asynchronous equivalent to the python console."""
import sys
import random
import asyncio
import argparse
import shlex
from . import code
class AsynchronousCli(code.AsynchronousConsole):
def __init__(self, commands, streams=None, *, prog=None,
prompt_control=None, loop=None):
super().__init__(
streams=streams, prompt_control=prompt_control, loop=loop)
self.prog = prog
self.commands = dict(commands)
self.commands['help'] = (
self.help_command,
argparse.ArgumentParser(
description='Display the help message.'))
self.commands['list'] = (
self.list_command,
argparse.ArgumentParser(
description='Display the command list.'))
self.commands['exit'] = (
factory = lambda streams: code.AsynchronousConsole(
streams, locals, filename, prompt_control=prompt_control)
server = yield from start_interactive_server(
def start_interactive_server(factory=code.AsynchronousConsole,
host='localhost', port=8000, banner=None,
*, loop=None):
callback = lambda reader, writer: handle_connect(
reader, writer, factory, banner)
server = yield from asyncio.start_server(callback, host, port, loop=loop)
return server