Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
host="127.0.0.1", port=0, banner='test')
address = server.sockets[0].getsockname()
stream = io.StringIO()
print_server(server, "test console", file=stream)
expected = "The test console is being served on 127.0.0.1:{}\n"
assert stream.getvalue() == expected.format(address[1])
reader, writer = yield from asyncio.open_connection(*address)
assert (yield from reader.readline()) == b'test\n'
writer.write(b'1+1\n')
assert (yield from reader.readline()) == b'>>> 2\n'
writer.write_eof()
assert (yield from reader.readline()) == b'>>> \n'
writer.close()
if compat.PY37:
yield from writer.wait_closed()
server.close()
yield from server.wait_closed()
def test_apython_with_prompt_control_and_ainput(capfd):
input_string = "{} ainput()\nhello\n".format(
'await' if compat.PY35 else 'yield from')
with patch('sys.stdin', new=io.StringIO(input_string)):
with pytest.raises(SystemExit):
apython.run_apython(
['--no-readline', '--banner=test', '--prompt-control=â–²'])
out, err = capfd.readouterr()
assert out == ''
assert err == "test\n▲>>> ▲▲▲'hello'\n▲>>> ▲\n"
def compile_for_aexec(source, filename="", mode="single",
dont_imply_dedent=False, local={}):
"""Return a list of (coroutine object, abstract base tree)."""
flags = ast.PyCF_ONLY_AST
if dont_imply_dedent:
flags |= codeop.PyCF_DONT_IMPLY_DEDENT
if compat.PY35:
# Avoid a syntax error by wrapping code with `async def`
indented = '\n'.join(line and ' ' * 4 + line
for line in source.split('\n'))
coroutine = CORO_DEF + '\n' + indented + '\n'
interactive = compile(coroutine, filename, mode, flags).body[0]
# Check EOF errors
try:
compile(source, filename, mode, flags)
except SyntaxError as exc:
if exc.msg == 'unexpected EOF while parsing':
raise
else:
interactive = compile(source, filename, mode, flags)
return [make_tree(statement, filename, mode) for
statement in interactive.body]
import traceback
from . import stream
from . import compat
from . import execute
EXTRA_MESSAGE = """\
---
This console is running in an asyncio event loop.
It allows you to wait for coroutines using the '{0}' syntax.
Try: {0} asyncio.sleep(1, result=3)
---""".format('await' if compat.PY35 else 'yield from')
current_task = (
asyncio.current_task if compat.PY37 else asyncio.Task.current_task)
class AsynchronousCompiler(codeop.CommandCompiler):
def __init__(self):
self.compiler = functools.partial(
execute.compile_for_aexec,
dont_imply_dedent=True)
class AsynchronousConsole(code.InteractiveConsole):
def __init__(self, streams=None, locals=None, filename="",
prompt_control=None, *, loop=None):
super().__init__(locals, filename)
# Process arguments
"""Provide a readline wrapper to control a subprocess."""
import sys
import ctypes
import signal
import builtins
import subprocess
from concurrent.futures import ThreadPoolExecutor
from . import compat
if compat.platform == 'darwin':
import fcntl
def rlwrap_process(args, prompt_control, use_stderr=False):
assert len(prompt_control) == 1
# Start process
process = subprocess.Popen(
args,
bufsize=0,
universal_newlines=True,
stdin=subprocess.PIPE,
**{'stderr' if use_stderr else 'stdout': subprocess.PIPE})
# Readline wrapping
return _rlwrap(process, prompt_control, use_stderr)