Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import chat.routing
application = ProtocolTypeRouter(
{
# (http->django views is added by default)
"websocket": AuthMiddlewareStack(URLRouter(chat.routing.websocket_urlpatterns))
}
from channels.routing import ProtocolTypeRouter, URLRouter, ChannelNameRouter
from django.urls import path
from mail.email import EmailConsumer
from make_queue.views.stream.stream import StreamConsumer
websocket_urlpatterns = [
path('ws/stream//', StreamConsumer),
]
channel_routes = {
"email": EmailConsumer
}
application = ProtocolTypeRouter({
'websocket': AuthMiddlewareStack(
URLRouter(
websocket_urlpatterns
)
),
'channel': ChannelNameRouter(
channel_routes
)
__ws_router = URLRouter([
url(r"^ws/deployment/$", DeploymentConsumer),
url(r"^ws/update_block_lists/$", UpdateBlockLists),
])
__http_router = URLRouter([
url(r"/static/", StaticFilesHandler),
url(r"", AsgiHandler)
])
if is_database_migrated():
application = ProtocolTypeRouter({
"websocket": AuthMiddlewareStack(__ws_router),
"http": AuthMiddlewareStack(__http_router),
})
else:
application = ProtocolTypeRouter({
"http": __http_router
})
from channels.auth import AuthMiddlewareStack
from channels.routing import URLRouter, ProtocolTypeRouter
from django.urls import path
from .consumers import EchoConsumer
application = ProtocolTypeRouter({
"websocket": AuthMiddlewareStack(
URLRouter([
path(r"ws/", EchoConsumer),
# path(r"stats/", StatsConsumer),
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.staticfiles import StaticFilesWrapper
from django.conf.urls import url
from .consumers import GraphqlSubcriptionConsumer, MainConsumer
application = StaticFilesWrapper(
ProtocolTypeRouter({
"websocket":
AuthMiddlewareStack(
URLRouter([
url("^ws/$", GraphqlSubcriptionConsumer),
url("^direct/$", MainConsumer),
])),
class WebsocketCloseConsumer(WebsocketConsumer):
def connect(self):
self.accept()
self.close()
def receive(self, text_data: Optional[str] = None, bytes_data: Optional[bytes] = None):
pass
def disconnect(self, code):
pass
application = ProtocolTypeRouter(
{
"websocket": AuthMiddlewareStack(
URLRouter(
[
re_path(r"^bus/$", BusConsumer),
# This MUST match the signage_display entry in intranet/apps/signage/urls.py
re_path(r"^signage/display/(?P[-_\w]+)?$", SignageConsumer),
re_path(r"^.*$", WebsocketCloseConsumer),
]
# -*- coding:utf-8 -*-
# !/usr/bin/env python
# Time 18-1-11
# Author Yo
# Email YoLoveLife@outlook.com
from __future__ import absolute_import,unicode_literals
from ops.urls.socket_urls import ops_routing
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
application = ProtocolTypeRouter({
'websocket': AuthMiddlewareStack(
URLRouter(
ops_routing,
)
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import celery_api.urls.ws_urls
application = ProtocolTypeRouter({
# Empty for now (http->django views is added by default)
'websocket': AuthMiddlewareStack(
URLRouter(
celery_api.urls.ws_urls.urlpatterns
)
"""This module registers the core apps websocket patterns."""
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import core.routing
APPLICATION = ProtocolTypeRouter(
{"websocket": AuthMiddlewareStack(URLRouter(core.routing.WEBSOCKET_URLPATTERNS))}
)
# `channels.auth.UserLazyObject` instances.
# https://github.com/datadvance/DjangoChannelsGraphqlWs/issues/23
self.scope["user"] = await channels.auth.get_user(self.scope)
schema = graphene.Schema(query=Query, mutation=Mutation, subscription=Subscription)
middleware = [demo_middleware]
# ------------------------------------------------------------------------- ASGI ROUTING
# NOTE: Please note `channels.auth.AuthMiddlewareStack` wrapper, for
# more details about Channels authentication read:
# https://channels.readthedocs.io/en/latest/topics/authentication.html
application = channels.routing.ProtocolTypeRouter(
{
"websocket": channels.auth.AuthMiddlewareStack(
channels.routing.URLRouter(
[django.urls.path("graphql/", MyGraphqlWsConsumer)]
)
)
}
)
# -------------------------------------------------------------------- URL CONFIGURATION
def graphiql(request):
"""Trivial view to serve the `graphiql.html` file."""
del request
graphiql_filepath = pathlib.Path(__file__).absolute().parent / "graphiql.html"
with open(graphiql_filepath) as f:
return django.http.response.HttpResponse(f.read())