Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from mangum.connections import ConnectionTable
from mangum.types import ASGIScope, ASGIMessage, ASGIApp
from mangum.exceptions import ASGIWebSocketCycleException
class ASGICycleState(enum.Enum):
REQUEST = enum.auto()
RESPONSE = enum.auto()
@dataclass
class ASGICycle:
scope: ASGIScope
state: ASGICycleState = ASGICycleState.REQUEST
binary: bool = False
response: dict = field(default_factory=dict)
def __post_init__(self) -> None:
self.loop = asyncio.get_event_loop()
self.app_queue = asyncio.Queue(loop=self.loop)
def __call__(self, app: ASGIApp):
asgi_instance = app(self.scope, self.asgi_receive, self.asgi_send)
asgi_task = self.loop.create_task(asgi_instance)
self.loop.run_until_complete(asgi_task)
return self.response
async def asgi_receive(self) -> dict:
message = await self.app_queue.get()
return message
async def asgi_send(self, message: ASGIMessage) -> None:
if self.state is ASGICycleState.REQUEST:
if message["type"] in ("websocket.accept", "websocket.close"):
self.state = ASGICycleState.RESPONSE
else:
raise RuntimeError(
f"Expected 'websocket.accept' or 'websocket.close', received: {message['type']}"
)
else:
data = message.get("text")
if message["type"] == "websocket.send":
group = message.get("group", None)
self.send_data(data=data, group=group)