Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self):
# OK, now we are in a forked child process, and want to use the reactor.
# However, FreeBSD systems like MacOS do not fork the underlying Kqueue,
# which asyncio (hence asyncioreactor) is built on.
# Therefore, we should uninstall the broken reactor and install a new one.
_reinstall_reactor()
from twisted.internet import reactor
from .server import Server
from .endpoints import build_endpoint_description_strings
try:
# Create the server class
endpoints = build_endpoint_description_strings(host=self.host, port=0)
self.server = Server(
application=self.application,
endpoints=endpoints,
signal_handlers=False,
**self.kwargs
)
# Set up a poller to look for the port
reactor.callLater(0.1, self.resolve_port)
# Run with setup/teardown
self.setup()
try:
self.server.run()
finally:
self.teardown()
except Exception as e:
# Put the error on our queue so the parent gets it
self.errors.put((e, traceback.format_exc()))
# worker = WorkerThread(channel_layer)
# worker.daemon = True
# worker.start()
try:
from daphne.server import Server
from daphne.endpoints import build_endpoint_description_strings
from channels.routing import get_default_application
django.setup()
#application = django.core.handlers.wsgi.WSGIHandler()
print(addr, port)
endpoints = build_endpoint_description_strings(host=addr, port=int(port))
server = Server(
#channel_layer=channel_layer,
get_default_application(),
endpoints=endpoints,
#host=addr,
#port=int(port),
signal_handlers=False,
action_logger=log_action,
http_timeout=60,
)
server.run()
except KeyboardInterrupt:
return