Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self) -> None:
self.app = empty_framework
self.config = Config()
self.sent_events: list = []
self.state = ASGIHTTPState.REQUEST
def __init__(self) -> None:
self.app = empty_framework
self.client = ("127.0.0.1", 5000)
self.config = Config()
self.server = ("remote", 5000)
self.state = ASGIHTTPState.REQUEST
self.sent_events: List[H11SendableEvent] = []
(ASGIHTTPState.REQUEST, "not_a_real_type"),
(ASGIHTTPState.RESPONSE, "http.response.start"),
(ASGIHTTPState.CLOSED, "http.response.start"),
(ASGIHTTPState.CLOSED, "http.response.body"),
],
)
async def test_asgi_send_invalid_message_given_state(
state: ASGIHTTPState, message_type: str
) -> None:
server = MockH11()
server.state = state
with pytest.raises(UnexpectedMessage):
await server.asgi_send({"type": message_type})
(ASGIHTTPState.REQUEST, "not_a_real_type"),
(ASGIHTTPState.RESPONSE, "http.response.start"),
(ASGIHTTPState.CLOSED, "http.response.start"),
(ASGIHTTPState.CLOSED, "http.response.body"),
],
)
async def test_http_asgi_send_invalid_message_given_state(
state: ASGIHTTPState, message_type: str
) -> None:
stream = MockH2HTTPStream()
stream.state = state
with pytest.raises(UnexpectedMessage):
await stream.asgi_send({"type": message_type})
def recycle_or_close(self, future: asyncio.Future) -> None:
if self.connection.our_state is h11.DONE:
try:
self.connection.start_next_cycle()
except h11.LocalProtocolError:
self.close()
else:
self.transport.resume_reading() # type: ignore
self.app_queue = asyncio.Queue(loop=self.loop)
self.response = None
self.scope = None
self.state = ASGIHTTPState.REQUEST
self.start_keep_alive_timeout()
self.handle_events()
else: # Either reached a good close state, or has errored
self.close()
def __init__(self, app: ASGIFramework, config: Config, stream: trio.abc.Stream) -> None:
super().__init__(stream, "h11")
self.app = app
self.config = config
self.connection = h11.Connection(
h11.SERVER, max_incomplete_event_size=self.config.h11_max_incomplete_size
)
self.response: Optional[dict] = None
self.scope: Optional[dict] = None
self.state = ASGIHTTPState.REQUEST
self.app_send_channel, self.app_receive_channel = trio.open_memory_channel(10)
self,
app: ASGIFramework,
loop: asyncio.AbstractEventLoop,
config: Config,
transport: asyncio.BaseTransport,
) -> None:
super().__init__(loop, config, transport, "h11")
self.app = app
self.connection = h11.Connection(
h11.SERVER, max_incomplete_event_size=self.config.h11_max_incomplete_size
)
self.app_queue: asyncio.Queue = asyncio.Queue(loop=loop)
self.response: Optional[dict] = None
self.scope: Optional[dict] = None
self.state = ASGIHTTPState.REQUEST
self.task: Optional[asyncio.Future] = None
async def recycle_or_close(self) -> None:
if self.connection.our_state is h11.DONE:
await self.app_send_channel.aclose()
await self.app_receive_channel.aclose()
self.connection.start_next_cycle()
self.app_send_channel, self.app_receive_channel = trio.open_memory_channel(10)
self.response = None
self.scope = None
self.state = ASGIHTTPState.REQUEST
else:
raise MustCloseError()
def __init__(self, app: ASGIFramework, config: Config, asend: Callable) -> None:
self.app = app
self.config = config
self.response: Optional[dict] = None
self.scope: Optional[dict] = None
self.state = ASGIHTTPState.REQUEST
self.asend = asend # type: ignore
self.app_send_channel, self.app_receive_channel = trio.open_memory_channel(10)
async def asgi_send(self, message: dict) -> None:
"""Called by the ASGI instance to send a message."""
if message["type"] == "http.response.start" and self.state == ASGIHTTPState.REQUEST:
self.response = message
elif message["type"] == "http.response.body" and self.state in {
ASGIHTTPState.REQUEST,
ASGIHTTPState.RESPONSE,
}:
if self.state == ASGIHTTPState.REQUEST:
headers = build_and_validate_headers(self.response["headers"])
headers.extend(self.response_headers())
await self.asend(
h11.Response(status_code=int(self.response["status"]), headers=headers)
)
self.state = ASGIHTTPState.RESPONSE
if (
not suppress_body(self.scope["method"], int(self.response["status"]))
and message.get("body", b"") != b""
):
await self.asend(h11.Data(data=bytes(message["body"])))