Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
writer2.write('b\n')
yield from writer2.drain()
data = yield from reader.readline()
assert data == b'b\n'
assert stderr.getvalue() == 'b\n'
writer2.stream = Mock(spec={})
yield from writer2.drain()
data = yield from reader.read(2)
assert data == b'c\n'
assert reader.at_eof() == False
if compat.PY35:
assert (yield from reader.__aiter__()) == reader
assert (yield from reader.__anext__()) == b'd\n'
with pytest.raises(StopAsyncIteration):
yield from reader.__anext__()
else:
assert (yield from reader.read()) == b'd\n'
assert (yield from reader.read()) == b''
assert reader.at_eof() == True
def test_interact_traceback(event_loop, monkeypatch):
with stdcontrol(event_loop, monkeypatch) as (reader, writer):
banner = "A BANNER"
writer.write('1/0\n')
writer.stream.close()
yield from interact(banner=banner, stop=False)
# Check stderr
yield from assert_stream(reader, banner)
yield from assert_stream(
reader, sys.ps1 + 'Traceback (most recent call last):')
# Skip 3 (or 5) lines
for _ in range(3 if compat.PY35 else 5):
yield from reader.readline()
# Check stderr
yield from assert_stream(reader, "ZeroDivisionError: division by zero")
yield from assert_stream(reader, sys.ps1)
def test_apython_with_ainput(capfd, use_readline):
input_string = "{} ainput()\nhello\n".format(
'await' if compat.PY35 else 'yield from')
with patch('sys.stdin', new=io.StringIO(input_string)):
with pytest.raises(SystemExit):
apython.run_apython(['--banner=test'] + use_readline)
out, err = capfd.readouterr()
assert out == ''
assert err == "test\n>>> 'hello'\n>>> \n"
"""Provide an asynchronous equivalent *to exec*."""
import ast
import codeop
import asyncio
from . import compat
CORO_NAME = '__corofn'
CORO_DEF = 'def {}(): '.format(CORO_NAME)
if compat.PY35:
CORO_DEF = 'async ' + CORO_DEF
CORO_CODE = CORO_DEF + 'return (None, locals())\n'
def make_arg(key, annotation=None):
"""Make an ast function argument."""
arg = ast.arg(key, annotation)
arg.lineno, arg.col_offset = 0, 0
return arg
def full_update(dct, values):
"""Fully update a dictionary."""
dct.clear()
dct.update(values)
import codeop
import signal
import asyncio
import functools
import traceback
from . import stream
from . import compat
from . import execute
EXTRA_MESSAGE = """\
---
This console is running in an asyncio event loop.
It allows you to wait for coroutines using the '{0}' syntax.
Try: {0} asyncio.sleep(1, result=3)
---""".format('await' if compat.PY35 else 'yield from')
current_task = (
asyncio.current_task if compat.PY37 else asyncio.Task.current_task)
class AsynchronousCompiler(codeop.CommandCompiler):
def __init__(self):
self.compiler = functools.partial(
execute.compile_for_aexec,
dont_imply_dedent=True)
class AsynchronousConsole(code.InteractiveConsole):
def readline(self):
data = yield from self.loop.run_in_executor(None, self.stream.readline)
if isinstance(data, str):
data = data.encode()
self.eof = not data
return data
@asyncio.coroutine
def read(self, n=-1):
data = yield from self.loop.run_in_executor(None, self.stream.read, n)
if isinstance(data, str):
data = data.encode()
self.eof = not data
return data
if compat.PY35:
@asyncio.coroutine
def __aiter__(self):
return self
@asyncio.coroutine
def __anext__(self):
val = yield from self.readline()
if val == b'':
raise StopAsyncIteration
return val
class NonFileStreamWriter:
def __init__(self, stream, *, loop=None):
if loop is None: