Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
client = h11.Connection(h11.CLIENT)
await protocol.handle(
RawData(data=client.send(h11.Request(method="GET", target="/?a=b", headers=BASIC_HEADERS)))
)
protocol.stream.handle.assert_called()
assert protocol.stream.handle.call_args_list == [
call(
Request(
stream_id=1,
headers=[(b"host", b"hypercorn"), (b"connection", b"close")],
http_version="1.1",
method="GET",
raw_path=b"/?a=b",
)
),
call(EndBody(stream_id=1)),
]
async def test_handle_end_body(stream: HTTPStream) -> None:
stream.app_put = CoroutineMock()
await stream.handle(EndBody(stream_id=1))
stream.app_put.assert_called()
assert stream.app_put.call_args_list == [
call({"type": "http.request", "body": b"", "more_body": False})
]
async def test_protocol_handle_data_post_close(protocol: H11Protocol) -> None:
await protocol.handle(
RawData(data=b"POST / HTTP/1.1\r\nHost: hypercorn\r\nContent-Length: 10\r\n")
)
await protocol.stream_send(Response(stream_id=1, status_code=201, headers=[]))
await protocol.stream_send(EndBody(stream_id=1))
# Key is that this doesn't error
await protocol.handle(RawData(data=b"abcdefghij"))
headers=[(b"sec-websocket-version", b"13")],
raw_path=b"/",
method="GET",
)
)
await stream.app_send(None)
stream.send.assert_called()
assert stream.send.call_args_list == [
call(
Response(
stream_id=1,
headers=[(b"content-length", b"0"), (b"connection", b"close")],
status_code=500,
)
),
call(EndBody(stream_id=1)),
call(StreamClosed(stream_id=1)),
]
stream.config._log.access.assert_called()
try:
event = self.connection.next_event()
except h11.RemoteProtocolError:
if self.connection.our_state in {h11.IDLE, h11.SEND_RESPONSE}:
await self._send_error_response(400)
await self.send(Closed())
break
else:
if isinstance(event, h11.Request):
await self._check_protocol(event)
await self._create_stream(event)
elif isinstance(event, h11.Data):
await self.stream.handle(Body(stream_id=STREAM_ID, data=event.data))
elif isinstance(event, h11.EndOfMessage):
await self.stream.handle(EndBody(stream_id=STREAM_ID))
elif isinstance(event, Data):
# WebSocket pass through
await self.stream.handle(event)
elif event is h11.PAUSED:
await self.send(Updated())
await self.can_read.clear()
await self.can_read.wait()
elif isinstance(event, h11.ConnectionClosed) or event is h11.NEED_DATA:
break
async def _handle_events(self, events: List[h2.events.Event]) -> None:
for event in events:
if isinstance(event, h2.events.RequestReceived):
await self._create_stream(event)
elif isinstance(event, h2.events.DataReceived):
await self.streams[event.stream_id].handle(
Body(stream_id=event.stream_id, data=event.data)
)
self.connection.acknowledge_received_data(
event.flow_controlled_length, event.stream_id
)
elif isinstance(event, h2.events.StreamEnded):
await self.streams[event.stream_id].handle(EndBody(stream_id=event.stream_id))
elif isinstance(event, h2.events.StreamReset):
await self._close_stream(event.stream_id)
await self._window_updated(event.stream_id)
elif isinstance(event, h2.events.WindowUpdated):
await self._window_updated(event.stream_id)
elif isinstance(event, h2.events.PriorityUpdated):
await self._priority_updated(event)
elif isinstance(event, h2.events.RemoteSettingsChanged):
if h2.settings.SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings:
await self._window_updated(None)
elif isinstance(event, h2.events.ConnectionTerminated):
await self.send(Closed())
await self._flush()
request_headers.extend(headers)
request_headers.extend(self.config.response_headers("h3"))
try:
push_stream_id = self.connection.send_push_promise(
stream_id=stream_id, headers=request_headers
)
except NoAvailablePushIDError:
# Client does not accept push promises or we are trying to
# push on a push promises request.
pass
else:
event = HeadersReceived(
stream_id=push_stream_id, stream_ended=True, headers=request_headers
)
await self._create_stream(event)
await self.streams[event.stream_id].handle(EndBody(stream_id=event.stream_id))
"raw_path": path,
"query_string": query_string,
"root_path": self.config.root_path,
"headers": event.headers,
"client": self.client,
"server": self.server,
}
if event.http_version in PUSH_VERSIONS:
self.scope["extensions"] = {"http.response.push": {}}
self.start_time = time()
self.app_put = await self.spawn_app(self.scope, self.app_send)
elif isinstance(event, Body):
await self.app_put(
{"type": "http.request", "body": bytes(event.data), "more_body": True}
)
elif isinstance(event, EndBody):
await self.app_put({"type": "http.request", "body": b"", "more_body": False})
elif isinstance(event, StreamClosed) and not self.closed:
self.closed = True
if self.app_put is not None:
await self.app_put({"type": "http.disconnect"})
await self._send_h11_event(
h11.Response(
headers=chain(event.headers, self.config.response_headers("h11")),
status_code=event.status_code,
)
)
else:
await self._send_h11_event(
h11.InformationalResponse(
headers=chain(event.headers, self.config.response_headers("h11")),
status_code=event.status_code,
)
)
elif isinstance(event, Body):
await self._send_h11_event(h11.Data(data=event.data))
elif isinstance(event, EndBody):
await self._send_h11_event(h11.EndOfMessage())
elif isinstance(event, Data):
await self.send(RawData(data=event.data))
elif isinstance(event, EndData):
pass
elif isinstance(event, StreamClosed):
await self._maybe_recycle()
async def handle(self, quic_event: QuicEvent) -> None:
for event in self.connection.handle_event(quic_event):
if isinstance(event, HeadersReceived):
await self._create_stream(event)
if event.stream_ended:
await self.streams[event.stream_id].handle(EndBody(stream_id=event.stream_id))
elif isinstance(event, DataReceived):
await self.streams[event.stream_id].handle(
Body(stream_id=event.stream_id, data=event.data)
)
if event.stream_ended:
await self.streams[event.stream_id].handle(EndBody(stream_id=event.stream_id))