Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _protocol(monkeypatch: MonkeyPatch) -> H2Protocol:
MockHTTPStream = AsyncMock() # noqa: N806
MockHTTPStream.return_value = AsyncMock(spec=HTTPStream)
monkeypatch.setattr(hypercorn.protocol.h11, "HTTPStream", MockHTTPStream)
return H2Protocol(Config(), False, None, None, CoroutineMock(), CoroutineMock(), EventWrapper)
async def _stream() -> HTTPStream:
stream = HTTPStream(Config(), False, None, None, CoroutineMock(), CoroutineMock(), 1)
stream.app_put = CoroutineMock()
stream.config._log = AsyncMock(spec=Logger)
return stream
async def test_protocol_handle_max_incomplete(monkeypatch: MonkeyPatch) -> None:
config = Config()
config.h11_max_incomplete_size = 5
MockHTTPStream = AsyncMock() # noqa: N806
MockHTTPStream.return_value = AsyncMock(spec=HTTPStream)
monkeypatch.setattr(hypercorn.protocol.h11, "HTTPStream", MockHTTPStream)
MockEvent = AsyncMock() # noqa: N806
MockEvent.return_value = AsyncMock(spec=IOEvent)
protocol = H11Protocol(config, False, None, None, CoroutineMock(), CoroutineMock(), MockEvent)
await protocol.handle(RawData(data=b"GET / HTTP/1.1\r\nHost: hypercorn\r\n"))
protocol.send.assert_called()
assert protocol.send.call_args_list == [
call(
RawData(
data=b"HTTP/1.1 400 \r\ncontent-length: 0\r\nconnection: close\r\n"
b"date: Thu, 01 Jan 1970 01:23:20 GMT\r\nserver: hypercorn-h11\r\n\r\n"
)
),
call(RawData(data=b"")),
call(Closed()),
]
async def _protocol(monkeypatch: MonkeyPatch) -> H11Protocol:
MockHTTPStream = AsyncMock() # noqa: N806
MockHTTPStream.return_value = AsyncMock(spec=HTTPStream)
monkeypatch.setattr(hypercorn.protocol.h11, "HTTPStream", MockHTTPStream)
MockEvent = AsyncMock() # noqa: N806
MockEvent.return_value = AsyncMock(spec=IOEvent)
return H11Protocol(Config(), False, None, None, CoroutineMock(), CoroutineMock(), MockEvent)
self.connection.DEFAULT_MAX_INBOUND_FRAME_SIZE = config.h2_max_inbound_frame_size
self.connection.local_settings = h2.settings.Settings(
client=False,
initial_values={
h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS: config.h2_max_concurrent_streams,
h2.settings.SettingCodes.MAX_HEADER_LIST_SIZE: config.h2_max_header_list_size,
h2.settings.SettingCodes.ENABLE_CONNECT_PROTOCOL: 1,
},
)
self.event_class = event_class
self.send = send
self.server = server
self.spawn_app = spawn_app
self.ssl = ssl
self.streams: Dict[int, Union[HTTPStream, WSStream]] = {}
# The below are used by the sending task
self.has_data = event_class()
self.priority = priority.PriorityTree()
self.stream_buffers: Dict[int, StreamBuffer] = {}
method = value.decode("ascii").upper()
elif name == b":path":
raw_path = value
if method == "CONNECT":
self.streams[request.stream_id] = WSStream(
self.config,
True,
self.client,
self.server,
self.stream_send,
self.spawn_app,
request.stream_id,
)
else:
self.streams[request.stream_id] = HTTPStream(
self.config,
True,
self.client,
self.server,
self.stream_send,
self.spawn_app,
request.stream_id,
)
await self.streams[request.stream_id].handle(
Request(
stream_id=request.stream_id,
headers=filter_pseudo_headers(request.headers),
http_version="3",
method=method,
raw_path=raw_path,
method = value.decode("ascii").upper()
elif name == b":path":
raw_path = value
if method == "CONNECT":
self.streams[request.stream_id] = WSStream(
self.config,
self.ssl,
self.client,
self.server,
self.stream_send,
self.spawn_app,
request.stream_id,
)
else:
self.streams[request.stream_id] = HTTPStream(
self.config,
self.ssl,
self.client,
self.server,
self.stream_send,
self.spawn_app,
request.stream_id,
)
self.stream_buffers[request.stream_id] = StreamBuffer(self.event_class)
try:
self.priority.insert_stream(request.stream_id)
except priority.DuplicateStreamError:
# Recieved PRIORITY frame before HEADERS frame
pass
else:
self.priority.block(request.stream_id)
def __init__(
self,
config: Config,
client: Optional[Tuple[str, int]],
server: Optional[Tuple[str, int]],
spawn_app: Callable[[dict, Callable], Awaitable[Callable]],
quic: QuicConnection,
send: Callable[[], Awaitable[None]],
) -> None:
self.client = client
self.config = config
self.connection = H3Connection(quic)
self.send = send
self.server = server
self.spawn_app = spawn_app
self.streams: Dict[int, Union[HTTPStream, WSStream]] = {}
any(token.strip() == "upgrade" for token in connection_tokens)
and upgrade_value.lower() == "websocket"
and request.method.decode("ascii").upper() == "GET"
):
self.stream = WSStream(
self.config,
self.ssl,
self.client,
self.server,
self.stream_send,
self.spawn_app,
STREAM_ID,
)
self.connection = H11WSConnection(self.connection)
else:
self.stream = HTTPStream(
self.config,
self.ssl,
self.client,
self.server,
self.stream_send,
self.spawn_app,
STREAM_ID,
)
await self.stream.handle(
Request(
stream_id=STREAM_ID,
headers=request.headers,
http_version=request.http_version.decode(),
method=request.method.decode("ascii").upper(),
raw_path=request.target,
)
server: Optional[Tuple[str, int]],
send: Callable[[Event], Awaitable[None]],
spawn_app: Callable[[dict, Callable], Awaitable[Callable]],
event_class: Type[IOEvent],
) -> None:
self.can_read = event_class()
self.client = client
self.config = config
self.connection = h11.Connection(
h11.SERVER, max_incomplete_event_size=self.config.h11_max_incomplete_size
)
self.send = send
self.server = server
self.spawn_app = spawn_app
self.ssl = ssl
self.stream: Optional[Union[HTTPStream, WSStream]] = None