How to use the aioconsole.stream.NonFileStreamWriter function in aioconsole

To help you get started, we’ve selected a few aioconsole examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github vxgmichel / aioconsole / tests / test_interact.py View on Github external
def stdcontrol(event_loop, monkeypatch):
    # PS1
    monkeypatch.setattr('sys.ps1', "[Hello!]", raising=False)
    # Stdin control
    stdin_read, stdin_write = os.pipe()
    monkeypatch.setattr('sys.stdin', open(stdin_read))
    writer = NonFileStreamWriter(open(stdin_write, 'w'), loop=event_loop)
    # Stdout control
    monkeypatch.setattr(sys, 'stdout', io.StringIO())
    # Stderr control
    stderr_read, stderr_write = os.pipe()
    monkeypatch.setattr('sys.stderr', open(stderr_write, 'w'))
    reader = NonFileStreamReader(open(stderr_read), loop=event_loop)
    # Yield
    yield reader, writer
    # Check
    assert sys.stdout.getvalue() == ''
github vxgmichel / aioconsole / aioconsole / stream.py View on Github external
def create_standard_streams(stdin, stdout, stderr, *, loop=None):
    if all(map(is_pipe_transport_compatible, (stdin, stdout, stderr))):
        return (yield from open_stantard_pipe_connection(
            stdin, stdout, stderr, loop=loop))
    return (
        NonFileStreamReader(stdin, loop=loop),
        NonFileStreamWriter(stdout, loop=loop),
        NonFileStreamWriter(stderr, loop=loop))
github vxgmichel / aioconsole / aioconsole / stream.py View on Github external
def create_standard_streams(stdin, stdout, stderr, *, loop=None):
    if all(map(is_pipe_transport_compatible, (stdin, stdout, stderr))):
        return (yield from open_stantard_pipe_connection(
            stdin, stdout, stderr, loop=loop))
    return (
        NonFileStreamReader(stdin, loop=loop),
        NonFileStreamWriter(stdout, loop=loop),
        NonFileStreamWriter(stderr, loop=loop))