Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
connection_id=connection_id,
connection_table=connection_table,
)
asgi_cycle.app_queue.put_nowait({"type": "websocket.connect"})
asgi_cycle.app_queue.put_nowait(
{
"type": "websocket.receive",
"path": "/",
"bytes": None,
"text": event["body"],
}
)
try:
asgi_cycle(self.app)
except ASGIWebSocketCycleException: # pragma: no cover
return make_response("Error", status_code=500)
return make_response("OK", status_code=200)
elif event_type == "DISCONNECT":
connection_table = ConnectionTable()
status_code = connection_table.delete_item(connection_id)
if status_code != 200: # pragma: no cover
return make_response("WebSocket disconnect error.", status_code=500)
return make_response("OK", status_code=200)
return make_response("Error", status_code=500) # pragma: no cover
Send a data message to a client or group of clients using the connection table.
"""
item = self.connection_table.get_item(self.connection_id)
if group:
# Retrieve the existing groups for the current connection, or create a new
# groups entry if one does not exist.
groups = item.get("groups", [])
if group not in groups:
# Ensure the group specified in the message is included.
groups.append(group)
result = self.connection_table.update_item(
self.connection_id, groups=groups
)
status_code = result.get("ResponseMetadata", {}).get("HTTPStatusCode")
if status_code != 200:
raise ASGIWebSocketCycleException("Error updating groups")
# Retrieve all items associated with the current group.
items = self.connection_table.get_group_items(group)
if items is None:
raise ASGIWebSocketCycleException("No connections found")
else:
# Single send, add the current item to a list to be iterated by the
# connection table.
items = [item]
self.connection_table.send_data(items, data=data)
# groups entry if one does not exist.
groups = item.get("groups", [])
if group not in groups:
# Ensure the group specified in the message is included.
groups.append(group)
result = self.connection_table.update_item(
self.connection_id, groups=groups
)
status_code = result.get("ResponseMetadata", {}).get("HTTPStatusCode")
if status_code != 200:
raise ASGIWebSocketCycleException("Error updating groups")
# Retrieve all items associated with the current group.
items = self.connection_table.get_group_items(group)
if items is None:
raise ASGIWebSocketCycleException("No connections found")
else:
# Single send, add the current item to a list to be iterated by the
# connection table.
items = [item]
self.connection_table.send_data(items, data=data)
# Retrieve the existing groups for the current connection, or create a new
# groups entry if one does not exist.
groups = item.get("groups", [])
if group not in groups:
# Ensure the group specified in the message is included.
groups.append(group)
status_code = self.connection_table.update_item(
self.connection_id, groups=groups
)
if status_code != 200:
raise ASGIWebSocketCycleException("Error updating groups")
# Retrieve all items associated with the current group.
items = self.connection_table.get_group_items(group)
if items is None:
raise ASGIWebSocketCycleException("No connections found")
else:
# Single send, add the current item to a list to be iterated by the
# connection table.
items = [item]
self.connection_table.send_data(
items, endpoint_url=self.endpoint_url, data=data
)
def send_data(self, *, data: str, group: str = None) -> None: # pragma: no cover
"""
Send a data message to a client or group of clients using the connection table.
"""
item = self.connection_table.get_item(self.connection_id)
if not item:
raise ASGIWebSocketCycleException("Connection not found")
if group:
# Retrieve the existing groups for the current connection, or create a new
# groups entry if one does not exist.
groups = item.get("groups", [])
if group not in groups:
# Ensure the group specified in the message is included.
groups.append(group)
status_code = self.connection_table.update_item(
self.connection_id, groups=groups
)
if status_code != 200:
raise ASGIWebSocketCycleException("Error updating groups")
# Retrieve all items associated with the current group.
items = self.connection_table.get_group_items(group)
item = self.connection_table.get_item(self.connection_id)
if not item:
raise ASGIWebSocketCycleException("Connection not found")
if group:
# Retrieve the existing groups for the current connection, or create a new
# groups entry if one does not exist.
groups = item.get("groups", [])
if group not in groups:
# Ensure the group specified in the message is included.
groups.append(group)
status_code = self.connection_table.update_item(
self.connection_id, groups=groups
)
if status_code != 200:
raise ASGIWebSocketCycleException("Error updating groups")
# Retrieve all items associated with the current group.
items = self.connection_table.get_group_items(group)
if items is None:
raise ASGIWebSocketCycleException("No connections found")
else:
# Single send, add the current item to a list to be iterated by the
# connection table.
items = [item]
self.connection_table.send_data(
items, endpoint_url=self.endpoint_url, data=data
)