Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self):
# A dict of protocol: {"application_instance":, "connected":, "disconnected":} dicts
self.connections = {}
# Make the factory
self.http_factory = HTTPFactory(self)
self.ws_factory = WebSocketFactory(self, server=self.server_name)
self.ws_factory.setProtocolOptions(
autoPingTimeout=self.ping_timeout,
allowNullOrigin=True,
openHandshakeTimeout=self.websocket_handshake_timeout,
)
if self.verbosity <= 1:
# Redirect the Twisted log to nowhere
globalLogBeginner.beginLoggingTo(
[lambda _: None], redirectStandardIO=False, discardBuffer=True
)
else:
globalLogBeginner.beginLoggingTo([STDLibLogObserver(__name__)])
# Detect what Twisted features are enabled
if http.H2_ENABLED:
logger.info("HTTP/2 support enabled")
def buildProtocol(self, addr):
"""
Builds protocol instances. We use this to inject the factory object into the protocol.
"""
try:
protocol = super(WebSocketFactory, self).buildProtocol(addr)
protocol.factory = self
return protocol
except Exception:
logger.error("Cannot build protocol: %s" % traceback.format_exc())
raise