How to use the mangum.asgi.adapter.ServerlessAdapter function in mangum

To help you get started, we’ve selected a few mangum examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github erm / mangum / mangum / platforms / aws / adapter.py View on Github external
super().__init__(*args, **kwargs)

    def on_response_start(self, headers: dict, status_code: int) -> None:
        self.response["statusCode"] = status_code
        self.response["isBase64Encoded"] = self.binary
        self.response["headers"] = {k.decode(): v.decode() for k, v in headers.items()}

    def on_response_close(self) -> None:
        if self.binary:
            body = base64.b64encode(self.body)
        else:
            body = self.body.decode()
        self.response["body"] = body


class AWSLambdaAdapter(ServerlessAdapter):
    """
    A adapter that wraps an ASGI application and handles transforming an incoming
    AWS Lambda & API Gateway request into the ASGI connection scope.

    After building the connection scope, it runs the ASGI application cycle and returns
    the response.
    """

    def asgi(self, event: dict, context: dict) -> dict:

        method = event["httpMethod"]
        headers = event["headers"] or {}
        path = event["path"]
        scheme = headers.get("X-Forwarded-Proto", "http")
        query_string_params = event["queryStringParameters"]
        query_string = (
github erm / mangum / mangum / platforms / azure / adapter.py View on Github external
"mimetype": str,
            "status_code": itnt
        }
    """

    def on_response_start(self, headers: dict, status_code: int) -> None:
        self.response["status_code"] = status_code
        self.response["headers"] = {k.decode(): v.decode() for k, v in headers.items()}
        self.response["mimetype"] = self.mimetype
        self.response["charset"] = self.charset

    def on_response_close(self) -> None:
        self.response["body"] = self.body


class AzureFunctionAdapter(ServerlessAdapter):
    """
    A adapter that wraps an ASGI application and handles transforming an incoming
    Azure Function request into the ASGI connection scope.

    After building the connection scope, it runs the ASGI application cycle and then
    serializes the response into an `HttpResponse`.
    """

    def asgi(self, event: HttpRequest) -> dict:
        server = None
        client = None
        scheme = "https"
        method = event.method
        headers = event.headers.items()
        parsed = urllib.parse.urlparse(event.url)
        scheme = parsed.scheme