Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
(ASGIWebsocketState.CONNECTED, "websocket.http.response.start"),
(ASGIWebsocketState.CONNECTED, "websocket.http.response.body"),
(ASGIWebsocketState.CLOSED, "websocket.send"),
(ASGIWebsocketState.CLOSED, "websocket.http.response.start"),
(ASGIWebsocketState.CLOSED, "websocket.http.response.body"),
],
)
async def test_asgi_send_invalid_message_given_state(
state: ASGIWebsocketState, message_type: str
) -> None:
server = MockWebsocket()
server.state = state
with pytest.raises(UnexpectedMessage):
await server.asgi_send({"type": message_type})
(ASGIWebsocketState.CONNECTED, "websocket.http.response.body"),
(ASGIWebsocketState.CLOSED, "websocket.send"),
(ASGIWebsocketState.CLOSED, "websocket.http.response.start"),
(ASGIWebsocketState.CLOSED, "websocket.http.response.body"),
],
)
async def test_asgi_send_invalid_message_given_state(
state: ASGIWebsocketState, message_type: str
) -> None:
server = MockWebsocket()
server.state = state
with pytest.raises(UnexpectedMessage):
await server.asgi_send({"type": message_type})
(ASGIWebsocketState.CONNECTED, "websocket.http.response.body"),
(ASGIWebsocketState.CLOSED, "websocket.send"),
(ASGIWebsocketState.CLOSED, "websocket.http.response.start"),
(ASGIWebsocketState.CLOSED, "websocket.http.response.body"),
],
)
async def test_websocket_asgi_send_invalid_message_given_state(
state: ASGIWebsocketState, message_type: str
) -> None:
stream = MockH2WebsocketStream()
stream.state = state
with pytest.raises(UnexpectedMessage):
await stream.asgi_send({"type": message_type})
(ASGIWebsocketState.CONNECTED, "websocket.http.response.start"),
(ASGIWebsocketState.CONNECTED, "websocket.http.response.body"),
(ASGIWebsocketState.CLOSED, "websocket.send"),
(ASGIWebsocketState.CLOSED, "websocket.http.response.start"),
(ASGIWebsocketState.CLOSED, "websocket.http.response.body"),
],
)
async def test_websocket_asgi_send_invalid_message_given_state(
state: ASGIWebsocketState, message_type: str
) -> None:
stream = MockH2WebsocketStream()
stream.state = state
with pytest.raises(UnexpectedMessage):
await stream.asgi_send({"type": message_type})
async def test_websocket_asgi_send_invalid_message(data_bytes: Any, data_text: Any) -> None:
stream = MockH2WebsocketStream()
stream.config = Config()
stream.state = ASGIWebsocketState.CONNECTED
with pytest.raises((TypeError, ValueError)):
await stream.asgi_send({"type": "websocket.send", "bytes": data_bytes, "text": data_text})
async def test_asgi_send_invalid_message(data_bytes: Any, data_text: Any) -> None:
server = WebsocketMixin()
server.config = Config()
server.start_time = 0.0
server.state = ASGIWebsocketState.CONNECTED
with pytest.raises((TypeError, ValueError)):
await server.asgi_send({"type": "websocket.send", "bytes": data_bytes, "text": data_text})
def __init__(self, app: ASGIFramework, config: Config, asend: Callable) -> None:
self.app = app
self.config = config
self.response: Optional[dict] = None
self.scope: Optional[dict] = None
self.state = ASGIWebsocketState.CONNECTED
self.connection: Optional[wsproto.connection.Connection] = None
self.asend = asend # type: ignore
self.app_send_channel, self.app_receive_channel = trio.open_memory_channel(10)
def __init__(self, app: ASGIFramework, config: Config, asend: Callable, send: Callable) -> None:
self.app = app
self.config = config
self.response: Optional[dict] = None
self.scope: Optional[dict] = None
self.state = ASGIWebsocketState.CONNECTED
self.connection: Optional[wsproto.connection.Connection] = None
self.asend = asend # type: ignore
self.send = send
self.to_app: asyncio.Queue = asyncio.Queue()
async def handle_asgi_app(self, event: Request) -> None:
self.start_time = time()
await self.asgi_put({"type": "websocket.connect"})
try:
await invoke_asgi(self.app, self.scope, self.asgi_receive, self.asgi_send)
except asyncio.CancelledError:
pass
except Exception:
if self.config.error_logger is not None:
self.config.error_logger.exception("Error in ASGI Framework")
if self.state == ASGIWebsocketState.CONNECTED:
await self.asend(CloseConnection(code=CloseReason.ABNORMAL_CLOSURE))
self.state = ASGIWebsocketState.CLOSED
# If the application hasn't accepted the connection (or sent a
# response) send a 500 for it. Otherwise if the connection
# hasn't been closed then close it.
if self.state == ASGIWebsocketState.HANDSHAKE:
await self.send_http_error(500)
self.state = ASGIWebsocketState.HTTPCLOSED
async def handle_asgi_app(self) -> None:
self.start_time = time()
await self.asgi_put({"type": "websocket.connect"})
try:
await invoke_asgi(self.app, self.scope, self.asgi_receive, self.asgi_send)
except asyncio.CancelledError:
pass
except Exception:
if self.config.error_logger is not None:
self.config.error_logger.exception("Error in ASGI Framework")
if self.state == ASGIWebsocketState.CONNECTED:
await self.asend(
Data(
self.connection.send(
wsproto.events.CloseConnection(
code=wsproto.frame_protocol.CloseReason.ABNORMAL_CLOSURE
)
)
)
)
self.state = ASGIWebsocketState.CLOSED
# If the application hasn't accepted the connection (or sent a
# response) send a 500 for it. Otherwise if the connection
# hasn't been closed then close it.
if self.state == ASGIWebsocketState.HANDSHAKE:
await self.send_http_error(500)