Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
super().__init__(*args, **kwargs)
def on_response_start(self, headers: dict, status_code: int) -> None:
self.response["statusCode"] = status_code
self.response["isBase64Encoded"] = self.binary
self.response["headers"] = {k.decode(): v.decode() for k, v in headers.items()}
def on_response_close(self) -> None:
if self.binary:
body = base64.b64encode(self.body)
else:
body = self.body.decode()
self.response["body"] = body
class AWSLambdaAdapter(ServerlessAdapter):
"""
A adapter that wraps an ASGI application and handles transforming an incoming
AWS Lambda & API Gateway request into the ASGI connection scope.
After building the connection scope, it runs the ASGI application cycle and returns
the response.
"""
def asgi(self, event: dict, context: dict) -> dict:
method = event["httpMethod"]
headers = event["headers"] or {}
path = event["path"]
scheme = headers.get("X-Forwarded-Proto", "http")
query_string_params = event["queryStringParameters"]
query_string = (
"mimetype": str,
"status_code": itnt
}
"""
def on_response_start(self, headers: dict, status_code: int) -> None:
self.response["status_code"] = status_code
self.response["headers"] = {k.decode(): v.decode() for k, v in headers.items()}
self.response["mimetype"] = self.mimetype
self.response["charset"] = self.charset
def on_response_close(self) -> None:
self.response["body"] = self.body
class AzureFunctionAdapter(ServerlessAdapter):
"""
A adapter that wraps an ASGI application and handles transforming an incoming
Azure Function request into the ASGI connection scope.
After building the connection scope, it runs the ASGI application cycle and then
serializes the response into an `HttpResponse`.
"""
def asgi(self, event: HttpRequest) -> dict:
server = None
client = None
scheme = "https"
method = event.method
headers = event.headers.items()
parsed = urllib.parse.urlparse(event.url)
scheme = parsed.scheme