Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_if_read_json_fails_content_type_header_is_checked_non_json_gives_invalid_operation():
request = Request('POST', b'/', [])
request.with_content(Content(b'text/html', b'{"hello":')) # broken json; broken content-type
with pytest.raises(InvalidOperation):
await request.json()
def pretty_json(data: Any, status: int = 200, dumps=JSON.dumps, indent: int = 4):
"""Returns a response with indented application/json content, and given status (default HTTP 200 OK)."""
return Response(status, None, Content(b'application/json', dumps(data, indent=indent).encode('utf8')))
break
yield chunk
yield b''
content = StreamedContent(content_type, data_provider)
elif callable(value):
# value is treated as an async generator
async def data_provider():
async for chunk in value():
yield chunk
yield b''
content = StreamedContent(content_type, data_provider)
elif isinstance(value, bytes):
content = Content(content_type, value)
elif isinstance(value, bytearray):
content = Content(content_type, bytes(value))
else:
raise ValueError('Invalid value, expected one of: Callable, str, bytes, bytearray, io.BytesIO')
return Response(200, [(b'Content-Disposition', content_disposition_value.encode())], content)
yield chunk
yield b''
content = StreamedContent(content_type, data_provider)
elif callable(value):
# value is treated as an async generator
async def data_provider():
async for chunk in value():
yield chunk
yield b''
content = StreamedContent(content_type, data_provider)
elif isinstance(value, bytes):
content = Content(content_type, value)
elif isinstance(value, bytearray):
content = Content(content_type, bytes(value))
else:
raise ValueError('Invalid value, expected one of: Callable, str, bytes, bytearray, io.BytesIO')
return Response(200, [(b'Content-Disposition', content_disposition_value.encode())], content)
def get_response(html: str):
return Response(200, [
(b'Cache-Control', b'no-cache')
]).with_content(Content(b'text/html; charset=utf-8', html.encode('utf8')))
def text(value: str, status: int = 200):
"""Returns a response with text/plain content, and given status (default HTTP 200 OK)."""
return Response(status, None, Content(b'text/plain; charset=utf-8', value.encode('utf8')))