Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def send(self, message: Message) -> None:
if self.state is ASGIState.REQUEST:
if message["type"] in ("websocket.accept", "websocket.close"):
self.state = ASGIState.RESPONSE
else:
raise RuntimeError(
f"Expected 'websocket.accept' or 'websocket.close', received: {message['type']}"
)
else:
data = message.get("text", "")
if message["type"] == "websocket.send":
group = message.get("group", None)
self.send_data(data=data, group=group)
from mangum.exceptions import ASGIWebSocketCycleException
class ASGIState(enum.Enum):
REQUEST = enum.auto()
RESPONSE = enum.auto()
@dataclass
class ASGIWebSocketCycle:
scope: Scope
endpoint_url: str
connection_id: str
connection_table: ConnectionTable
state: ASGIState = ASGIState.REQUEST
response: dict = field(default_factory=dict)
def __post_init__(self) -> None:
self.loop = asyncio.get_event_loop()
self.app_queue: asyncio.Queue = asyncio.Queue()
def __call__(self, app: ASGIApp) -> dict:
asgi_instance = app(self.scope, self.receive, self.send)
asgi_task = self.loop.create_task(asgi_instance)
self.loop.run_until_complete(asgi_task)
return self.response
async def receive(self) -> Message: # pragma: no cover
message = await self.app_queue.get()
return message