How to use the mangum.utils.encode_query_string function in mangum

To help you get started, we’ve selected a few mangum examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github erm / mangum / mangum / platforms / aws / adapter.py View on Github external
def asgi(self, event: dict, context: dict) -> dict:

        method = event["httpMethod"]
        headers = event["headers"] or {}
        path = event["path"]
        scheme = headers.get("X-Forwarded-Proto", "http")
        query_string_params = event["queryStringParameters"]
        query_string = (
            encode_query_string(query_string_params) if query_string_params else b""
        )

        client_addr = event["requestContext"].get("identity", {}).get("sourceIp", None)
        client = (client_addr, 0)  # TODO: Client port

        server_addr = headers["Host"]
        if ":" not in server_addr:
            server_port = 80
        else:
            server_port = int(server_addr.split(":")[1])

        server = (server_addr, server_port)

        scope = {
            "server": server,
            "client": client,
github erm / mangum / mangum / platforms / azure / adapter.py View on Github external
def asgi(self, event: HttpRequest) -> dict:
        server = None
        client = None
        scheme = "https"
        method = event.method
        headers = event.headers.items()
        parsed = urllib.parse.urlparse(event.url)
        scheme = parsed.scheme
        path = parsed.path
        query_string = encode_query_string(event.params) if event.params else b""

        scope = {
            "type": "http",
            "server": server,
            "client": client,
            "method": method,
            "path": path,
            "scheme": scheme,
            "http_version": "1.1",
            "root_path": "",
            "query_string": query_string,
            "headers": [[maybe_encode(k), maybe_encode(v)] for k, v in headers],
        }

        body = event.get_body() or b""
        response = AzureFunctionCycle(scope)(self.app, body=body)
github erm / mangum / mangum / platforms / aws / adapter.py View on Github external
headers = event["headers"] or {}
    path = event["path"]
    host = headers.get("Host")
    scheme = headers.get("X-Forwarded-Proto", "http")
    x_forwarded_for = headers.get("X-Forwarded-For")
    x_forwarded_port = headers.get("X-Forwarded-Port")

    if x_forwarded_port and x_forwarded_for:
        port = int(x_forwarded_port)
        client = (x_forwarded_for, port)
        if host:
            server = (host, port)

    query_string_params = event["queryStringParameters"]
    query_string = (
        encode_query_string(query_string_params) if query_string_params else ""
    )

    scope = {
        "server": server,
        "client": client,
        "scheme": scheme,
        "root_path": "",
        "query_string": query_string,
        "headers": headers.items(),
        "type": "http",
        "http_version": "1.1",
        "method": method,
        "path": path,
    }

    binary = event.get("isBase64Encoded", False)