Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_streaming_body_with_zero_read():
body = AsyncBytesIO(b'1234567890')
stream = response.StreamingBody(body, content_length=10)
chunk = await stream.read(0)
assert chunk == b''
assert await stream.read() == b'1234567890'
async def test_streaming_wrapper_validates_content_length():
body = AsyncBytesIO(b'1234567890')
stream = response.StreamingBody(body, content_length=10)
assert await stream.read() == b'1234567890'