Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_decode_unicode(httpbin):
r = await get(httpbin + f'/encoding/utf8', stream=True)
body = []
async with finalize(r.iter_content(decode_unicode=True)) as gen:
async for chunk in gen:
body.append(chunk)
assert r.connection.closed
body = ''.join(body).encode('utf-8')
assert len(body) == int(r.headers['content-length'])
async def test_response_iter_lines(httpbin):
r = await get(httpbin + f'/get', stream=True)
body = []
async with finalize(r.iter_lines()) as gen:
async for chunk in gen:
body.append(chunk)
assert r.connection.closed
def _curio_init(lib: _AsyncLib):
import curio
from ._event_loop_wrappers import (curio_sendall,
curio_recv,
curio_close,
curio_spawn)
lib.aopen = curio.aopen
lib.open_connection = curio.open_connection
lib.sleep = curio.sleep
lib.task_manager = curio.TaskGroup
lib.timeout_after = curio.timeout_after
lib.sendall = curio_sendall
lib.recv = curio_recv
lib.sock_close = curio_close
lib.spawn = curio_spawn
lib.finalize_agen = curio.meta.finalize
lib.cancel_task_group = _event_loop_wrappers.curio_cancel
lib.unwrap_taskgrouperror = lambda error: [task.next_exc for task in error.failed]
lib.unwrap_result = lambda task: task.result
lib.Lock = curio.Lock
lib.Semaphore = curio.BoundedSemaphore
lib.Queue = curio.Queue
lib.Event = curio.Event
lib.Cancelled = curio.CancelledError
lib.TaskTimeout = curio.TaskTimeout
lib.TaskGroupError = curio.TaskGroupError
lib.wait_read = _low_level.wait_read_curio
lib.wait_write = _low_level.wait_write_curio
async def ping_observing_task(address):
logger = logging.getLogger('moler.user.app-code')
# Lowest layer of Moler's usage (you manually glue all elements):
# 1. create observer
net_down_detector = NetworkDownDetector('10.0.2.15')
# 2. ThreadedMolerConnection is a proxy-glue between observer (speaks str)
# and curio-connection (speaks bytes)
moler_conn = ThreadedMolerConnection(decoder=lambda data: data.decode("utf-8"))
# 3a. glue from proxy to observer
moler_conn.subscribe(net_down_detector.data_received)
logger.debug('waiting for data to observe')
async with curio.meta.finalize(tcp_connection(address)) as tcp_conn:
async for connection_data in tcp_conn:
# 3b. glue to proxy from external-IO (curio tcp client connection)
# (client has to pass it's received data into Moler's connection)
moler_conn.data_received(connection_data)
# 4. Moler's client code must manually check status of observer ...
if net_down_detector.done():
# 5. ... to know when it can ask for result
net_down_time = net_down_detector.result()
timestamp = time.strftime("%H:%M:%S",
time.localtime(net_down_time))
logger.debug('Network is down from {}'.format(timestamp))
break
def finalise(cbl):
try:
from curio.meta import finalize
return finalize(cbl)
except ImportError:
return _Finalize(cbl)
async def generate():
async with self:
async with finalize(self.raw.stream(chunk_size)) as gen:
logger.debug(f'Iterate response body stream: {self}')
try:
async for trunk in gen:
yield trunk
except ProtocolError as e:
raise ChunkedEncodingError(e)
except DecodeError as e:
raise ContentDecodingError(e)
except ReadTimeoutError as e:
raise ConnectionError(e)
self._content_consumed = True
async def response_handler(client, response):
"""The bytes representation of the response
contains a body only if there's no streaming
In a case of a stream, it only contains headers.
"""
await client.sendall(bytes(response))
if response.stream is not None:
if isinstance(response.stream, AsyncGenerator):
async with curio.meta.finalize(response.stream):
async for data in response.stream:
await client.sendall(
b"%x\r\n%b\r\n" % (len(data), data))
else:
for data in response.stream:
await client.sendall(
b"%x\r\n%b\r\n" % (len(data), data))
await client.sendall(b'0\r\n\r\n')