Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
(ASGIHTTPState.CLOSED, "http.response.start"),
(ASGIHTTPState.CLOSED, "http.response.body"),
],
)
async def test_asgi_send_invalid_message_given_state(
state: ASGIHTTPState, message_type: str
) -> None:
server = MockH11()
server.state = state
with pytest.raises(UnexpectedMessage):
await server.asgi_send({"type": message_type})
(ASGIHTTPState.CLOSED, "http.response.body"),
],
)
async def test_http_asgi_send_invalid_message_given_state(
state: ASGIHTTPState, message_type: str
) -> None:
stream = MockH2HTTPStream()
stream.state = state
with pytest.raises(UnexpectedMessage):
await stream.asgi_send({"type": message_type})
def __init__(self, app: ASGIFramework, config: Config, asend: Callable) -> None:
self.app = app
self.config = config
self.response: Optional[dict] = None
self.scope: Optional[dict] = None
self.state = ASGIHTTPState.REQUEST
self.asend = asend # type: ignore
self.to_app: asyncio.Queue = asyncio.Queue()
async def asgi_send(self, message: dict) -> None:
"""Called by the ASGI instance to send a message."""
if message["type"] == "http.response.start" and self.state == ASGIHTTPState.REQUEST:
self.response = message
elif message["type"] == "http.response.body" and self.state in {
ASGIHTTPState.REQUEST,
ASGIHTTPState.RESPONSE,
}:
if self.state == ASGIHTTPState.REQUEST:
headers = build_and_validate_headers(self.response["headers"])
headers.extend(self.response_headers())
await self.asend(
h11.Response(status_code=int(self.response["status"]), headers=headers)
)
self.state = ASGIHTTPState.RESPONSE
if (
not suppress_body(self.scope["method"], int(self.response["status"]))
and message.get("body", b"") != b""
):
await self.asend(h11.Data(data=bytes(message["body"])))
if not message.get("more_body", False):
if self.state != ASGIHTTPState.CLOSED:
await self.asend(h11.EndOfMessage())
async def asgi_send(self, message: dict) -> None:
if message["type"] == "http.response.start" and self.state == ASGIHTTPState.REQUEST:
self.response = message
elif message["type"] == "http.response.push":
if not isinstance(message["path"], str):
raise TypeError(f"{message['path']} should be a str")
headers = build_and_validate_headers(message["headers"])
await self.asend(ServerPush(message["path"], headers))
elif message["type"] == "http.response.body" and self.state in {
ASGIHTTPState.REQUEST,
ASGIHTTPState.RESPONSE,
}:
if self.state == ASGIHTTPState.REQUEST:
headers = [(b":status", b"%d" % self.response["status"])]
headers.extend(build_and_validate_headers(self.response["headers"]))
await self.asend(Response(headers))
self.state = ASGIHTTPState.RESPONSE
if (
not suppress_body(self.scope["method"], self.response["status"])
and message.get("body", b"") != b""
):
await self.asend(Data(bytes(message.get("body", b""))))
if not message.get("more_body", False):
if self.state != ASGIHTTPState.CLOSED:
await self.asend(EndStream())
else:
raise UnexpectedMessage(self.state, message["type"])