Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_receive_data_when_closed() -> None:
client = Connection(CLIENT)
client._state = ConnectionState.CLOSED
with pytest.raises(LocalProtocolError):
client.receive_data(b"something")
def test_unsolicited_pong() -> None:
client = Connection(CLIENT)
server = Connection(SERVER)
payload = b"x" * 23
server.receive_data(client.send(Pong(payload=payload)))
event = next(server.events())
assert isinstance(event, Pong)
assert event.payload == payload
def test_unsolicited_pong() -> None:
client = Connection(CLIENT)
server = Connection(SERVER)
payload = b"x" * 23
server.receive_data(client.send(Pong(payload=payload)))
event = next(server.events())
assert isinstance(event, Pong)
assert event.payload == payload
(":scheme", "https"),
("sec-websocket-version", "13"),
],
)
await server.reader.send(h2_client.data_to_send()) # type: ignore
events = h2_client.receive_data(await server.writer.receive()) # type: ignore
await server.reader.send(h2_client.data_to_send()) # type: ignore
events = h2_client.receive_data(await server.writer.receive()) # type: ignore
events = h2_client.receive_data(await server.writer.receive()) # type: ignore
assert isinstance(events[0], h2.events.ResponseReceived)
assert events[0].headers == [
(b":status", b"200"),
(b"date", b"Thu, 01 Jan 1970 01:23:20 GMT"),
(b"server", b"hypercorn-h2"),
]
client = wsproto.connection.Connection(wsproto.ConnectionType.CLIENT)
h2_client.send_data(stream_id, client.send(wsproto.events.BytesMessage(data=SANITY_BODY)))
await server.reader.send(h2_client.data_to_send()) # type: ignore
events = h2_client.receive_data(await server.writer.receive()) # type: ignore
client.receive_data(events[0].data)
assert list(client.events()) == [wsproto.events.TextMessage(data="Hello & Goodbye")]
h2_client.send_data(stream_id, client.send(wsproto.events.CloseConnection(code=1000)))
await server.reader.send(h2_client.data_to_send()) # type: ignore
events = h2_client.receive_data(await server.writer.receive()) # type: ignore
client.receive_data(events[0].data)
assert list(client.events()) == [wsproto.events.CloseConnection(code=1000, reason="")]
await server.reader.send(b"") # type: ignore
def test_ping_pong(client_sends: bool) -> None:
client = Connection(CLIENT)
server = Connection(SERVER)
if client_sends:
local = client
remote = server
else:
local = server
remote = client
payload = b"x" * 23
remote.receive_data(local.send(Ping(payload=payload)))
event = next(remote.events())
assert isinstance(event, Ping)
assert event.payload == payload
local.receive_data(remote.send(event.response()))
event = next(local.events())
def test_close_whilst_closing() -> None:
client = Connection(CLIENT)
client.send(CloseConnection(code=CloseReason.NORMAL_CLOSURE))
with pytest.raises(LocalProtocolError):
client.send(CloseConnection(code=CloseReason.NORMAL_CLOSURE))
self.state = ASGIWebsocketState.CONNECTED
extensions: List[str] = []
for name, value in self.scope["headers"]:
if name == b"sec-websocket-extensions":
extensions = split_comma_header(value)
supported_extensions = [wsproto.extensions.PerMessageDeflate()]
accepts = server_extensions_handshake(extensions, supported_extensions)
headers = [(b":status", b"200")]
headers.extend(build_and_validate_headers(message.get("headers", [])))
raise_if_subprotocol_present(headers)
if message.get("subprotocol") is not None:
headers.append((b"sec-websocket-protocol", message["subprotocol"].encode()))
if accepts:
headers.append((b"sec-websocket-extensions", accepts))
await self.asend(Response(headers))
self.connection = wsproto.connection.Connection(
wsproto.connection.ConnectionType.SERVER, supported_extensions
)
self.config.access_logger.access(
self.scope, {"status": 200, "headers": []}, time() - self.start_time
)
elif (
message["type"] == "websocket.http.response.start"
and self.state == ASGIWebsocketState.HANDSHAKE
):
self.response = message
self.config.access_logger.access(self.scope, self.response, time() - self.start_time)
elif message["type"] == "websocket.http.response.body" and self.state in {
ASGIWebsocketState.HANDSHAKE,
ASGIWebsocketState.RESPONSE,
}:
await self._asgi_send_rejection(message)
not suppress_body(self.scope["method"], self.response["status"])
and message.get("body", b"") != b""
):
await self.asend(Data(bytes(message.get("body", b""))))
if not message.get("more_body", False):
if self.state != ASGIHTTPState.CLOSED:
await self.asend(EndStream())
else:
raise UnexpectedMessage(self.state, message["type"])
class H2WebsocketStreamMixin:
app: ASGIFramework
asend: Callable
config: Config
connection: wsproto.connection.Connection
response: dict
state: ASGIWebsocketState
async def asgi_put(self, message: dict) -> None:
"""Called by the ASGI server to put a message to the ASGI instance.
See asgi_receive as the get to this put.
"""
pass
async def asgi_receive(self) -> dict:
"""Called by the ASGI instance to receive a message."""
pass
async def handle_request(
self,
def __init__(self, app: ASGIFramework, config: Config, asend: Callable, send: Callable) -> None:
self.app = app
self.config = config
self.response: Optional[dict] = None
self.scope: Optional[dict] = None
self.state = ASGIWebsocketState.CONNECTED
self.connection: Optional[wsproto.connection.Connection] = None
self.asend = asend # type: ignore
self.send = send
self.to_app: asyncio.Queue = asyncio.Queue()