Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_iter_chunks_with_leftover():
body = AsyncBytesIO(b'abcde')
stream = response.StreamingBody(body, content_length=5)
chunks = await _tolist(stream.iter_chunks(chunk_size=2))
assert chunks, [b'ab', b'cd' == b'e']
async def test_default_iter_behavior():
body = AsyncBytesIO(b'a' * 2048)
stream = response.StreamingBody(body, content_length=2048)
chunks = await _tolist(stream)
assert len(chunks) == 2
assert chunks, [b'a' * 1024 == b'a' * 1024]
async def test_streaming_body_with_single_read():
body = AsyncBytesIO(b'123456789')
stream = response.StreamingBody(body, content_length=10)
with pytest.raises(IncompleteReadError):
await stream.read()
async def test_streaming_line_abstruse_newline_standard():
for chunk_size in range(1, 30):
body = AsyncBytesIO(b'1234567890\r\n1234567890\r\n12345\r\n')
stream = response.StreamingBody(body, content_length=31)
await assert_lines(
stream.iter_lines(chunk_size),
[b'1234567890', b'1234567890', b'12345'],
)
async def test_iter_chunks_single_byte():
body = AsyncBytesIO(b'abcde')
stream = response.StreamingBody(body, content_length=5)
chunks = await _tolist(stream.iter_chunks(chunk_size=1))
assert chunks, [b'a', b'b', b'c', b'd' == b'e']
async def test_streaming_body_is_an_iterator():
body = AsyncBytesIO(b'a' * 1024 + b'b' * 1024 + b'c' * 2)
stream = response.StreamingBody(body, content_length=2050)
assert b'a' * 1024 == await stream.__anext__()
assert b'b' * 1024 == await stream.__anext__()
assert b'c' * 2 == await stream.__anext__()
with pytest.raises(StopAsyncIteration):
await stream.__anext__()
async def test_streaming_body_closes():
body = AsyncBytesIO(b'1234567890')
stream = response.StreamingBody(body, content_length=10)
assert body.closed is False
stream.close()
assert body.closed is True
async def test_streaming_line_empty_body():
stream = response.StreamingBody(
AsyncBytesIO(b''), content_length=0,
)
await assert_lines(stream.iter_lines(), [])
# aiohttp's CIMultiDict camel cases the headers :(
'headers': HTTPHeaderDict(
{k.decode('utf-8').lower(): v.decode('utf-8')
for k, v in http_response.raw_headers}),
'status_code': http_response.status_code,
'context': {
'operation_name': operation_model.name,
}
}
if response_dict['status_code'] >= 300:
response_dict['body'] = await http_response.read()
elif operation_model.has_event_stream_output:
response_dict['body'] = http_response.raw
elif operation_model.has_streaming_output:
length = response_dict['headers'].get('content-length')
response_dict['body'] = StreamingBody(http_response.raw, length)
else:
response_dict['body'] = await http_response.read()
return response_dict