Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_invoke_asgi_2() -> None:
result: dict = {}
def asgi2_callable(scope: dict) -> Callable:
nonlocal result
result = scope
async def inner(receive: Callable, send: Callable) -> None:
pass
return inner
await hypercorn.utils.invoke_asgi(asgi2_callable, {"asgi": {}}, None, None) # type: ignore
assert result["asgi"]["version"] == "2.0"
async def test_invoke_asgi_3() -> None:
result: dict = {}
async def asgi3_callable(scope: dict, receive: Callable, send: Callable) -> None:
nonlocal result
result = scope
await hypercorn.utils.invoke_asgi(asgi3_callable, {"asgi": {}}, None, None)
assert result["asgi"]["version"] == "3.0"
async def handle_lifespan(self) -> None:
self._started.set()
scope = {"type": "lifespan", "asgi": {"spec_version": "2.0"}}
try:
await invoke_asgi(self.app, scope, self.asgi_receive, self.asgi_send)
except LifespanFailure:
# Lifespan failures should crash the server
raise
except Exception:
self.supported = False
if not self.startup.is_set():
message = "ASGI Framework Lifespan error, continuing without Lifespan support"
elif not self.shutdown.is_set():
message = "ASGI Framework Lifespan error, shutdown without Lifespan support"
else:
message = "ASGI Framework Lifespan errored after shutdown."
await self.config.log.exception(message)
async def handle_asgi_app(self, event: Request) -> None:
self.start_time = time()
await self.asgi_put({"type": "websocket.connect"})
try:
await invoke_asgi(self.app, self.scope, self.asgi_receive, self.asgi_send)
except asyncio.CancelledError:
pass
except Exception:
if self.config.error_logger is not None:
self.config.error_logger.exception("Error in ASGI Framework")
if self.state == ASGIWebsocketState.CONNECTED:
await self.asend(CloseConnection(code=CloseReason.ABNORMAL_CLOSURE))
self.state = ASGIWebsocketState.CLOSED
# If the application hasn't accepted the connection (or sent a
# response) send a 500 for it. Otherwise if the connection
# hasn't been closed then close it.
if self.state == ASGIWebsocketState.HANDSHAKE:
await self.send_http_error(500)
self.state = ASGIWebsocketState.HTTPCLOSED
async def handle_asgi_app(self) -> None:
start_time = time()
try:
await invoke_asgi(self.app, self.scope, self.asgi_receive, self.asgi_send)
except asyncio.CancelledError:
pass
except Exception:
if self.config.error_logger is not None:
self.config.error_logger.exception("Error in ASGI Framework")
# If the application hasn't sent a response, it has errored -
# send a 500 for it.
if self.state == ASGIHTTPState.REQUEST:
await self.asend(self.error_response(500))
await self.asend(h11.EndOfMessage())
self.response = {"status": 500, "headers": []}
self.config.access_logger.access(self.scope, self.response, time() - start_time)
async def handle_asgi_app(self) -> None:
self.start_time = time()
await self.asgi_put({"type": "websocket.connect"})
try:
await invoke_asgi(self.app, self.scope, self.asgi_receive, self.asgi_send)
except asyncio.CancelledError:
pass
except Exception:
if self.config.error_logger is not None:
self.config.error_logger.exception("Error in ASGI Framework")
if self.state == ASGIWebsocketState.CONNECTED:
await self.asend(
Data(
self.connection.send(
wsproto.events.CloseConnection(
code=wsproto.frame_protocol.CloseReason.ABNORMAL_CLOSURE
)
)
)
)
async def handle_lifespan(
self, *, task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED
) -> None:
task_status.started()
scope = {"type": "lifespan", "asgi": {"spec_version": "2.0"}}
try:
await invoke_asgi(self.app, scope, self.asgi_receive, self.asgi_send)
except LifespanFailure:
# Lifespan failures should crash the server
raise
except Exception:
self.supported = False
await self.config.log.exception(
"ASGI Framework Lifespan error, continuing without Lifespan support"
)
finally:
await self.app_send_channel.aclose()
await self.app_receive_channel.aclose()
async def __call__(self, scope: dict, receive: Callable, send: Callable) -> None:
if scope["type"] == "http" and scope["scheme"] == "http":
await self._send_http_redirect(scope, send)
elif scope["type"] == "websocket" and scope["scheme"] == "ws":
# If the server supports the WebSocket Denial Response
# extension we can send a redirection response, if not we
# can only deny the WebSocket connection.
if "websocket.http.response" in scope.get("extensions", {}):
await self._send_websocket_redirect(scope, send)
else:
await send({"type": "websocket.close"})
else:
return await invoke_asgi(self.app, scope, receive, send)
async def __call__(self, scope: dict, receive: Callable, send: Callable) -> None:
for path, app in self.mounts.items():
if scope["path"].startswith(path):
scope["path"] = scope["path"][len(path) :]
return await invoke_asgi(app, scope, receive, send)
await send(
{"type": "http.response.start", "status": 404, "headers": [(b"content-length", b"0")]}
)
await send({"type": "http.response.body"})