Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.urls import path
from knocker.routing import channel_routing as knocker_routing
application = ProtocolTypeRouter({
'websocket': AuthMiddlewareStack(
URLRouter([
path('knocker/', knocker_routing),
])
from django.conf import settings
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
urls_tab = []
if hasattr(settings, 'CHANNELS_URL_TAB'):
for row in settings.CHANNELS_URL_TAB:
u = row[0]
tmp = row[1].split('.')
m = importlib.import_module('.'.join(tmp[:-1]))
o = getattr(m,tmp[-1])
urls_tab.append(url(u,o))
application = ProtocolTypeRouter({
"websocket": AuthMiddlewareStack(
URLRouter(
urls_tab
)
from django.conf import settings
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
urls_tab = []
if hasattr(settings, "CHANNELS_URL_TAB"):
for row in settings.CHANNELS_URL_TAB:
u = row[0]
tmp = row[1].split(".")
m = importlib.import_module(".".join(tmp[:-1]))
o = getattr(m, tmp[-1])
urls_tab.append(url(u, o))
application = ProtocolTypeRouter(
{"websocket": AuthMiddlewareStack(URLRouter(urls_tab))}
)
"""This module registers the core apps websocket patterns."""
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import core.routing
APPLICATION = ProtocolTypeRouter(
{"websocket": AuthMiddlewareStack(URLRouter(core.routing.WEBSOCKET_URLPATTERNS))}
)
from channels.routing import ProtocolTypeRouter, URLRouter
from django.conf.urls import url
from . import consumers
websocket_urlpatterns = [
url(r'^ws/logs/(?P[^/]+)/$', consumers.LogsConsumer),
]
application = ProtocolTypeRouter({
"websocket" : URLRouter(websocket_urlpatterns)
})
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.apps import apps
bokeh_app_config = apps.get_app_config('bokeh.server.django')
application = ProtocolTypeRouter({
'websocket': AuthMiddlewareStack(URLRouter(bokeh_app_config.routes.get_websocket_urlpatterns())),
'http': AuthMiddlewareStack(URLRouter(bokeh_app_config.routes.get_http_urlpatterns())),
})
@subscription.subscription("messages")
def subscribe_messages(root, info):
return EventEmitterAsyncIterator(pubsub, "message")
@subscription.field("messages")
def push_message(message, info):
return message
schema = make_executable_schema(SCHEMA, [mutation, query, subscription])
application = ProtocolTypeRouter(
{
"http": URLRouter([url(r"^graphql/$", GraphQLHTTPConsumer.for_schema(schema))]),
"websocket": URLRouter(
[url(r"^graphql/$", GraphQLWebsocketConsumer.for_schema(schema))]
),
from channels.routing import ProtocolTypeRouter, URLRouter
from django.conf.urls import url
from . import consumers
urls = [url(r"", consumers.NotifySubscribeConsumer)]
application = ProtocolTypeRouter({"websocket": URLRouter(urls)})
2018/6/6:
-------------------------------------------------
"""
from django.urls import path, re_path
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from projs.utils.log_websocket import LogConsumer
from fort.utils.webssh import FortConsumer
from assets.utils.webssh import SSHConsumer
from fort.utils.webguacamole import GuacamoleConsumer
from assets.utils.webguacamole import AdminGuacamole
from task.utils.ans_module_websocket import AnsModuleConsumer
from task.utils.ans_playbook_websocket import AnsPlaybookConsumer
from projs.utils.deploy_websocket import DeployConsumer
application = ProtocolTypeRouter({
"websocket": AuthMiddlewareStack(
URLRouter([
# URLRouter just takes standard Django path() or url() entries.
path(r'ws/deploy/', DeployConsumer),
path(r'ws/ans_module_log/', AnsModuleConsumer),
path(r'ws/ans_playbook_log/', AnsPlaybookConsumer),
path(r'ws/deploy_log/', LogConsumer),
re_path(r'ws/fortssh/([0-9]+)/([0-9]+)/', FortConsumer),
re_path(r'ws/webssh/([0-9]+)/', SSHConsumer),
re_path(r'ws/fort_guacamole/([0-9]+)/([0-9]+)/(?P.*)/', GuacamoleConsumer),
re_path(r'ws/admin_guacamole/([0-9]+)/(?P.*)/', AdminGuacamole),
]),
from channels.routing import ProtocolTypeRouter
application = ProtocolTypeRouter({
# Empty for now (http->django views is added by default)