Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __call__(self, app: ASGIApp):
asgi_instance = app(self.scope, self.asgi_receive, self.asgi_send)
asgi_task = self.loop.create_task(asgi_instance)
self.loop.run_until_complete(asgi_task)
return self.response
async def asgi_receive(self) -> dict:
message = await self.app_queue.get()
return message
def put_message(self, message: ASGIMessage) -> None:
self.app_queue.put_nowait(message)
@dataclass
class ASGIHTTPCycle(ASGICycle):
body: bytes = b""
async def asgi_send(self, message: ASGIMessage) -> None:
if self.state is ASGICycleState.REQUEST:
if message["type"] != "http.response.start":
raise RuntimeError(
f"Expected 'http.response.start', received: {message['type']}"
)
status_code = message["status"]
headers = {k: v for k, v in message.get("headers", [])}
self.response["statusCode"] = status_code
self.response["isBase64Encoded"] = self.binary
self.response["headers"] = {
k.decode(): v.decode() for k, v in headers.items()
body = message.get("body", b"")
more_body = message.get("more_body", False)
# The body must be completely read before returning the response.
self.body += body
if not more_body:
body = self.body
if self.binary:
body = base64.b64encode(body)
self.response["body"] = body.decode()
self.put_message({"type": "http.disconnect"})
@dataclass
class ASGIWebSocketCycle(ASGICycle):
endpoint_url: str = None
connection_id: str = None
connection_table: ConnectionTable = None
async def asgi_send(self, message: ASGIMessage) -> None:
if self.state is ASGICycleState.REQUEST:
if message["type"] in ("websocket.accept", "websocket.close"):
self.state = ASGICycleState.RESPONSE
else:
raise RuntimeError(
f"Expected 'websocket.accept' or 'websocket.close', received: {message['type']}"
)
else:
data = message.get("text")
if message["type"] == "websocket.send":
import base64
from mangum.asgi import ASGICycle
from mangum.utils import encode_query_string
class AWSLambdaCycle(ASGICycle):
def __init__(self, *args, **kwargs) -> None:
self.binary = kwargs.pop("binary", False)
super().__init__(*args, **kwargs)
def on_response_start(self, headers: dict, status_code: int) -> None:
self.response["statusCode"] = status_code
self.response["isBase64Encoded"] = self.binary
self.response["headers"] = headers
def on_response_body(self, body: bytes) -> None:
if self.binary:
body = base64.b64encode(body)
else:
body = body.decode("ascii")
self.response["body"] = body