Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_websocket_bad_framework() -> None:
stream = MockH2WebsocketStream()
stream.app = bad_framework
request = h2.events.RequestReceived()
request.headers = [
(b":method", b"GET"),
(b":protocol", "websocket"),
(b":path", b"/accept"),
(b":authority", b"hypercorn"),
(b":scheme", b"https"),
(b"sec-websocket-version", b"13"),
]
await stream.handle_request(request, "https", ("127.0.0.1", 5000), ("remote", 5000))
assert stream.sent_events == [
Response(headers=[(b":status", b"200")]),
Data(data=b"\x88\x02\x03\xe8"),
]
async def test_http_asgi_send() -> None:
stream = MockH2HTTPStream()
stream.scope = {"method": "GET"}
await stream.asgi_send(
{"type": "http.response.start", "headers": [(b"X-Header", b"Value")], "status": 200}
)
# Server must not send a response till the receipt of the first
# body chunk.
assert stream.sent_events == []
await stream.asgi_send({"type": "http.response.body", "body": b"a", "more_body": True})
await stream.asgi_send({"type": "http.response.body", "more_body": False})
assert stream.sent_events == [
Response([(b":status", b"200"), (b"x-header", b"Value")]),
Data(b"a"),
EndStream(),
]
def data_received(self, data: bytes) -> None:
self.connection.receive_data(data)
for event in self.connection.events():
if isinstance(event, wsproto.events.TextMessage):
self.to_app.put_nowait({"type": "websocket.receive", "text": event.data})
elif isinstance(event, wsproto.events.BytesMessage):
self.to_app.put_nowait({"type": "websocket.receive", "bytes": event.data})
elif isinstance(event, wsproto.events.Ping):
self.send(Data(self.connection.send(event.response())))
elif isinstance(event, wsproto.events.CloseConnection):
if self.connection.state == wsproto.connection.ConnectionState.REMOTE_CLOSING:
self.send(Data(self.connection.send(event.response())))
self.to_app.put_nowait({"type": "websocket.disconnect"})
break
self.response = message
self.config.access_logger.access(self.scope, self.response, time() - self.start_time)
elif message["type"] == "websocket.http.response.body" and self.state in {
ASGIWebsocketState.HANDSHAKE,
ASGIWebsocketState.RESPONSE,
}:
await self._asgi_send_rejection(message)
elif message["type"] == "websocket.send" and self.state == ASGIWebsocketState.CONNECTED:
event: wsproto.events.Event
if message.get("bytes") is not None:
event = wsproto.events.BytesMessage(data=bytes(message["bytes"]))
elif not isinstance(message["text"], str):
raise TypeError(f"{message['text']} should be a str")
else:
event = wsproto.events.TextMessage(data=message["text"])
await self.asend(Data(self.connection.send(event)))
elif message["type"] == "websocket.close" and self.state == ASGIWebsocketState.HANDSHAKE:
await self.send_http_error(403)
self.state = ASGIWebsocketState.HTTPCLOSED
elif message["type"] == "websocket.close":
data = self.connection.send(wsproto.events.CloseConnection(code=int(message["code"])))
await self.asend(Data(data))
self.state = ASGIWebsocketState.CLOSED
else:
raise UnexpectedMessage(self.state, message["type"])
headers = build_and_validate_headers(message["headers"])
await self.asend(ServerPush(message["path"], headers))
elif message["type"] == "http.response.body" and self.state in {
ASGIHTTPState.REQUEST,
ASGIHTTPState.RESPONSE,
}:
if self.state == ASGIHTTPState.REQUEST:
headers = [(b":status", b"%d" % self.response["status"])]
headers.extend(build_and_validate_headers(self.response["headers"]))
await self.asend(Response(headers))
self.state = ASGIHTTPState.RESPONSE
if (
not suppress_body(self.scope["method"], self.response["status"])
and message.get("body", b"") != b""
):
await self.asend(Data(bytes(message.get("body", b""))))
if not message.get("more_body", False):
if self.state != ASGIHTTPState.CLOSED:
await self.asend(EndStream())
else:
raise UnexpectedMessage(self.state, message["type"])
connection_state = self.connection.state_machine.state
stream_state = self.connection.streams[stream_id].state_machine.state
if (
connection_state == h2.connection.ConnectionState.CLOSED
or stream_state == h2.stream.StreamState.CLOSED
):
return
if isinstance(event, Response):
self.connection.send_headers(
stream_id, event.headers + self.response_headers() # type: ignore
)
await self.send()
elif isinstance(event, EndStream):
self.connection.end_stream(stream_id)
await self.send()
elif isinstance(event, Data):
await self.send_data(stream_id, event.data)
elif isinstance(event, ServerPush):
await self.server_push(stream_id, event.path, event.headers)
async def data_received(self, data: bytes) -> None:
self.connection.receive_data(data)
for event in self.connection.events():
if isinstance(event, wsproto.events.TextMessage):
await self.app_send_channel.send({"type": "websocket.receive", "text": event.data})
elif isinstance(event, wsproto.events.BytesMessage):
await self.app_send_channel.send({"type": "websocket.receive", "bytes": event.data})
elif isinstance(event, wsproto.events.Ping):
await self.asend(Data(self.connection.send(event.response())))
elif isinstance(event, wsproto.events.CloseConnection):
if self.connection.state == wsproto.connection.ConnectionState.REMOTE_CLOSING:
await self.asend(Data(self.connection.send(event.response())))
await self.app_send_channel.send({"type": "websocket.disconnect"})
break
connection_state = self.connection.state_machine.state
stream_state = self.connection.streams[stream_id].state_machine.state
if (
connection_state == h2.connection.ConnectionState.CLOSED
or stream_state == h2.stream.StreamState.CLOSED
):
return
if isinstance(event, Response):
self.connection.send_headers(
stream_id, event.headers + self.response_headers() # type: ignore
)
self.flush()
elif isinstance(event, EndStream):
self.connection.end_stream(stream_id)
self.flush()
elif isinstance(event, Data):
await self.send_data(stream_id, event.data)
elif isinstance(event, ServerPush):
self.server_push(stream_id, event.path, event.headers)
await self._asgi_send_rejection(message)
elif message["type"] == "websocket.send" and self.state == ASGIWebsocketState.CONNECTED:
event: wsproto.events.Event
if message.get("bytes") is not None:
event = wsproto.events.BytesMessage(data=bytes(message["bytes"]))
elif not isinstance(message["text"], str):
raise TypeError(f"{message['text']} should be a str")
else:
event = wsproto.events.TextMessage(data=message["text"])
await self.asend(Data(self.connection.send(event)))
elif message["type"] == "websocket.close" and self.state == ASGIWebsocketState.HANDSHAKE:
await self.send_http_error(403)
self.state = ASGIWebsocketState.HTTPCLOSED
elif message["type"] == "websocket.close":
data = self.connection.send(wsproto.events.CloseConnection(code=int(message["code"])))
await self.asend(Data(data))
self.state = ASGIWebsocketState.CLOSED
else:
raise UnexpectedMessage(self.state, message["type"])