Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def open_stantard_pipe_connection(pipe_in, pipe_out, pipe_err, *, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
# Reader
in_reader = StandardStreamReader(loop=loop)
protocol = StandardStreamReaderProtocol(in_reader, loop=loop)
yield from loop.connect_read_pipe(lambda: protocol, pipe_in)
# Out writer
out_write_connect = loop.connect_write_pipe(lambda: protocol, pipe_out)
out_transport, _ = yield from out_write_connect
out_writer = StandardStreamWriter(out_transport, protocol, in_reader, loop)
# Err writer
err_write_connect = loop.connect_write_pipe(lambda: protocol, pipe_err)
err_transport, _ = yield from err_write_connect
err_writer = StandardStreamWriter(err_transport, protocol, in_reader, loop)
# Return
return in_reader, out_writer, err_writer
def open_stantard_pipe_connection(pipe_in, pipe_out, pipe_err, *, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
# Reader
in_reader = StandardStreamReader(loop=loop)
protocol = StandardStreamReaderProtocol(in_reader, loop=loop)
yield from loop.connect_read_pipe(lambda: protocol, pipe_in)
# Out writer
out_write_connect = loop.connect_write_pipe(lambda: protocol, pipe_out)
out_transport, _ = yield from out_write_connect
out_writer = StandardStreamWriter(out_transport, protocol, in_reader, loop)
# Err writer
err_write_connect = loop.connect_write_pipe(lambda: protocol, pipe_err)
err_transport, _ = yield from err_write_connect
err_writer = StandardStreamWriter(err_transport, protocol, in_reader, loop)
# Return
return in_reader, out_writer, err_writer