Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_asgi_send_invalid_message_given_state(
state: ASGIHTTPState, message_type: str
) -> None:
server = MockH11()
server.state = state
with pytest.raises(UnexpectedMessage):
await server.asgi_send({"type": message_type})
async def test_websocket_asgi_send_invalid_message_given_state(
state: ASGIWebsocketState, message_type: str
) -> None:
stream = MockH2WebsocketStream()
stream.state = state
with pytest.raises(UnexpectedMessage):
await stream.asgi_send({"type": message_type})
async def test_asgi_send_invalid_message_given_state(
state: ASGIWebsocketState, message_type: str
) -> None:
server = MockWebsocket()
server.state = state
with pytest.raises(UnexpectedMessage):
await server.asgi_send({"type": message_type})
async def test_http_asgi_send_invalid_message_given_state(
state: ASGIHTTPState, message_type: str
) -> None:
stream = MockH2HTTPStream()
stream.state = state
with pytest.raises(UnexpectedMessage):
await stream.asgi_send({"type": message_type})
if message.get("bytes") is not None:
event = wsproto.events.BytesMessage(data=bytes(message["bytes"]))
elif not isinstance(message["text"], str):
raise TypeError(f"{message['text']} should be a str")
else:
event = wsproto.events.TextMessage(data=message["text"])
await self.asend(Data(self.connection.send(event)))
elif message["type"] == "websocket.close" and self.state == ASGIWebsocketState.HANDSHAKE:
await self.send_http_error(403)
self.state = ASGIWebsocketState.HTTPCLOSED
elif message["type"] == "websocket.close":
data = self.connection.send(wsproto.events.CloseConnection(code=int(message["code"])))
await self.asend(Data(data))
self.state = ASGIWebsocketState.CLOSED
else:
raise UnexpectedMessage(self.state, message["type"])
}:
if self.state == ASGIHTTPState.REQUEST:
headers = [(b":status", b"%d" % self.response["status"])]
headers.extend(build_and_validate_headers(self.response["headers"]))
await self.asend(Response(headers))
self.state = ASGIHTTPState.RESPONSE
if (
not suppress_body(self.scope["method"], self.response["status"])
and message.get("body", b"") != b""
):
await self.asend(Data(bytes(message.get("body", b""))))
if not message.get("more_body", False):
if self.state != ASGIHTTPState.CLOSED:
await self.asend(EndStream())
else:
raise UnexpectedMessage(self.state, message["type"])
elif message["type"] == "websocket.send" and self.state == ASGIWebsocketState.CONNECTED:
data: Union[bytes, str]
if message.get("bytes") is not None:
await self.asend(BytesMessage(data=bytes(message["bytes"])))
elif not isinstance(message["text"], str):
raise TypeError(f"{message['text']} should be a str")
else:
await self.asend(TextMessage(data=message["text"]))
elif message["type"] == "websocket.close" and self.state == ASGIWebsocketState.HANDSHAKE:
await self.send_http_error(403)
self.state = ASGIWebsocketState.HTTPCLOSED
elif message["type"] == "websocket.close":
await self.asend(CloseConnection(code=int(message["code"])))
self.state = ASGIWebsocketState.CLOSED
else:
raise UnexpectedMessage(self.state, message["type"])
)
self.state = ASGIHTTPState.RESPONSE
if (
not suppress_body(self.scope["method"], int(self.response["status"]))
and message.get("body", b"") != b""
):
await self.asend(h11.Data(data=bytes(message["body"])))
if not message.get("more_body", False):
if self.state != ASGIHTTPState.CLOSED:
await self.asend(h11.EndOfMessage())
await self.asgi_put({"type": "http.disconnect"})
self.state = ASGIHTTPState.CLOSED
else:
raise UnexpectedMessage(self.state, message["type"])