Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self):
handler = AdminMediaHandler(djwsgi.WSGIHandler())
sock = listen(self.addrport)
g = self.spawn(self.server, sock, handler)
self.info("ready")
httpd_ready.send(sender=self, addrport=self.addrport,
handler=handler, sock=sock)
return g.wait()
def prepare_signals(self):
signals.controller_ready.connect(self._component_ready)
signals.httpd_ready.connect(self._component_ready)
signals.supervisor_ready.connect(self._component_ready)
signals.agent_ready.connect(self.on_ready)
ready_event=None, **kwargs):
self.id = id or gen_unique_id()
if isinstance(addrport, basestring):
addr, _, port = addrport.partition(":")
addrport = (addr, int(port) if port else 8000)
self.addrport = addrport
self.connection = celery.broker_connection()
self.without_httpd = without_httpd
self.logfile = logfile
self.loglevel = loglevel
self.numc = numc
self.ready_event = ready_event
self.exit_request = Event()
if not self.without_httpd:
self.httpd = gSup(instantiate(self.httpd_cls, addrport),
signals.httpd_ready)
self.supervisor = gSup(instantiate(self.supervisor_cls, sup_interval),
signals.supervisor_ready)
self.controllers = [gSup(instantiate(self.controller_cls,
id="%s.%s" % (self.id, i),
connection=self.connection),
signals.controller_ready)
for i in xrange(1, numc + 1)]
c = [self.supervisor] + self.controllers + [self.httpd]
c = self.components = list(filter(None, c))
self._components_ready = dict(zip([z.thread for z in c],
[False] * len(c)))
super(Agent, self).__init__()