Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
item = connection_table.get_item(connection_id)
if not item: # pragma: no cover
return make_response("Error", status_code=500)
# Retrieve and deserialize the scope entry created in the connect event for
# the current connection.
scope = json.loads(item["scope"])
# Ensure the scope definition complies with the ASGI spec.
query_string = scope["query_string"]
headers = scope["headers"]
headers = [[k.encode(), v.encode()] for k, v in headers.items()]
query_string = query_string.encode()
scope.update({"headers": headers, "query_string": query_string})
asgi_cycle = ASGIWebSocketCycle(
scope,
endpoint_url=endpoint_url,
connection_id=connection_id,
connection_table=connection_table,
)
asgi_cycle.app_queue.put_nowait({"type": "websocket.connect"})
asgi_cycle.app_queue.put_nowait(
{
"type": "websocket.receive",
"path": "/",
"bytes": None,
"text": event["body"],
}
)
try: