Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
stream = protocol.stream
await protocol.handle(Closed())
stream.handle.assert_called()
assert stream.handle.call_args_list == [
call(
Request(
stream_id=1,
headers=[(b"host", b"hypercorn"), (b"connection", b"close")],
http_version="1.1",
method="GET",
raw_path=b"/",
)
),
call(EndBody(stream_id=1)),
call(StreamClosed(stream_id=1)),
]
async def test_closure(stream: WSStream) -> None:
assert not stream.closed
await stream.handle(StreamClosed(stream_id=1))
assert stream.closed
await stream.handle(StreamClosed(stream_id=1))
assert stream.closed
# It is important that the disconnect message has only been sent
# once.
assert stream.app_put.call_args_list == [call({"type": "websocket.disconnect", "code": 1006})]
async def test_send_app_error(stream: HTTPStream) -> None:
await stream.handle(
Request(stream_id=1, http_version="2", headers=[], raw_path=b"/?a=b", method="GET")
)
await stream.app_send(None)
stream.send.assert_called()
assert stream.send.call_args_list == [
call(
Response(
stream_id=1,
headers=[(b"content-length", b"0"), (b"connection", b"close")],
status_code=500,
)
),
call(EndBody(stream_id=1)),
call(StreamClosed(stream_id=1)),
]
stream.config._log.access.assert_called()
event.stream_id,
[(b":status", b"%d" % event.status_code)]
+ event.headers
+ self.config.response_headers("h2"),
)
await self._flush()
elif isinstance(event, (Body, Data)):
self.priority.unblock(event.stream_id)
await self.has_data.set()
await self.stream_buffers[event.stream_id].push(event.data)
elif isinstance(event, (EndBody, EndData)):
self.stream_buffers[event.stream_id].set_complete()
self.priority.unblock(event.stream_id)
await self.has_data.set()
await self.stream_buffers[event.stream_id].drain()
elif isinstance(event, StreamClosed):
await self._close_stream(event.stream_id)
await self.send(Updated())
elif isinstance(event, Request):
await self._create_server_push(event.stream_id, event.raw_path, event.headers)
except h2.exceptions.ProtocolError:
# Connection has closed whilst blocked on flow control or
# connection has advanced ahead of the last emitted event.
return
self.buffer.extend(event)
except FrameTooLarge:
await self._send_wsproto_event(
CloseConnection(code=CloseReason.MESSAGE_TOO_BIG)
)
break
if event.message_finished:
await self.app_put(self.buffer.to_message())
self.buffer.clear()
elif isinstance(event, Ping):
await self._send_wsproto_event(event.response())
elif isinstance(event, CloseConnection):
if self.connection.state == ConnectionState.REMOTE_CLOSING:
await self._send_wsproto_event(event.response())
await self.send(StreamClosed(stream_id=self.stream_id))
else:
await self._send_h11_event(
h11.InformationalResponse(
headers=chain(event.headers, self.config.response_headers("h11")),
status_code=event.status_code,
)
)
elif isinstance(event, Body):
await self._send_h11_event(h11.Data(data=event.data))
elif isinstance(event, EndBody):
await self._send_h11_event(h11.EndOfMessage())
elif isinstance(event, Data):
await self.send(RawData(data=event.data))
elif isinstance(event, EndData):
pass
elif isinstance(event, StreamClosed):
await self._maybe_recycle()
if (
not suppress_body(self.scope["method"], int(self.response["status"]))
and message.get("body", b"") != b""
):
await self.send(
Body(stream_id=self.stream_id, data=bytes(message.get("body", b"")))
)
if not message.get("more_body", False):
if self.state != ASGIHTTPState.CLOSED:
self.state = ASGIHTTPState.CLOSED
await self.config.log.access(
self.scope, self.response, time() - self.start_time
)
await self.send(EndBody(stream_id=self.stream_id))
await self.send(StreamClosed(stream_id=self.stream_id))
else:
raise UnexpectedMessage(self.state, message["type"])
"headers": event.headers,
"client": self.client,
"server": self.server,
"subprotocols": self.handshake.subprotocols or [],
"extensions": {"websocket.http.response": {}},
}
self.start_time = time()
if not self.handshake.is_valid():
await self._send_error_response(400)
else:
self.app_put = await self.spawn_app(self.scope, self.app_send)
await self.app_put({"type": "websocket.connect"})
elif isinstance(event, (Body, Data)):
self.connection.receive_data(event.data)
await self._handle_events()
elif isinstance(event, StreamClosed) and not self.closed:
self.closed = True
if self.app_put is not None:
if self.state in {ASGIWebsocketState.HTTPCLOSED, ASGIWebsocketState.CLOSED}:
code = CloseReason.NORMAL_CLOSURE.value
else:
code = CloseReason.ABNORMAL_CLOSURE.value
await self.app_put({"type": "websocket.disconnect", "code": code})
async def stream_send(self, event: StreamEvent) -> None:
if isinstance(event, Response):
self.connection.send_headers(
event.stream_id,
[(b":status", b"%d" % event.status_code)]
+ event.headers
+ self.config.response_headers("h3"),
)
await self.send()
elif isinstance(event, (Body, Data)):
self.connection.send_data(event.stream_id, event.data, False)
await self.send()
elif isinstance(event, (EndBody, EndData)):
self.connection.send_data(event.stream_id, b"", True)
await self.send()
elif isinstance(event, StreamClosed):
pass # ??
elif isinstance(event, Request):
await self._create_server_push(event.stream_id, event.raw_path, event.headers)
async def _close_stream(self) -> None:
if self.stream is not None:
await self.stream.handle(StreamClosed(stream_id=STREAM_ID))
self.stream = None