Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def manage_watchers(self):
if self._managing_watchers_future is not None:
logger.debug("manage_watchers is already running...")
return
try:
self._managing_watchers_future = self.arbiter.manage_watchers()
self.loop.add_future(self._managing_watchers_future,
self._manage_watchers_cb)
except ConflictError:
logger.debug("manage_watchers is conflicting with another command")
def reap_processes(self):
"""Reap all the processes for this watcher.
"""
if self.is_stopped():
logger.debug('do not reap processes as the watcher is stopped')
return
# reap_process changes our dict, look through the copy of keys
for pid in list(self.processes.keys()):
self.reap_process(pid)
def stop(self):
if self.destroy_context:
self.ctx.destroy(0)
logger.debug('Publisher stopped')
"manage_watchers, re-executing it at "
"the end")
cb = functools.partial(self.dispatch, job)
self.loop.add_future(self._managing_watchers_future, cb)
return
# conflicts between two commands, sending error...
return self.send_error(mid, cid, msg, str(e), cast=cast,
errno=errors.COMMAND_ERROR)
except OSError as e:
return self.send_error(mid, cid, msg, str(e), cast=cast,
errno=errors.OS_ERROR)
except: # noqa: E722
exctype, value = sys.exc_info()[:2]
tb = traceback.format_exc()
reason = "command %r: %s" % (msg, value)
logger.debug("error: command %r: %s\n\n%s", msg, value, tb)
return self.send_error(mid, cid, msg, reason, tb, cast=cast,
errno=errors.COMMAND_ERROR)
def stop(self):
"""Stop.
"""
logger.debug('stopping the %s watcher' % self.name)
# stop redirectors
if self.stdout_redirector is not None:
self.stdout_redirector.stop()
self.stdout_redirector = None
if self.stderr_redirector is not None:
self.stderr_redirector.stop()
self.stderr_redirector = None
limit = time.time() + self.graceful_timeout
logger.debug('gracefully stopping processes [%s] for %ss' % (
self.name, self.graceful_timeout))
# We ignore the hook result
self.call_hook('before_stop')
while self.get_active_processes() and time.time() < limit:
self.kill_processes(signal.SIGTERM)
self.reap_processes()
self.kill_processes(signal.SIGKILL)
if self.evpub_socket is not None:
self.notify_event("stop", {"time": time.time()})
self.stopped = True
def handle_message(self, raw_msg):
cid, msg = raw_msg
msg = msg.strip()
if not msg:
self.send_response(None, cid, msg, "error: empty command")
else:
logger.debug("got message %s", msg)
self.dispatch((cid, msg))
if cid is None:
return
if isinstance(resp, str):
raise DeprecationWarning('Takes only a mapping')
resp['id'] = mid
resp = json.dumps(resp)
logger.debug("sending response %s", resp)
try:
self.stream.send(cid, zmq.SNDMORE)
self.stream.send(resp)
except (IOError, zmq.ZMQError) as e:
logger.debug("Received %r - Could not send back %r - %s", msg,
resp, str(e))
def kill_process(self, process, stop_signal=None, graceful_timeout=None):
"""Kill process (stop_signal, graceful_timeout then SIGKILL)
"""
if stop_signal is None:
stop_signal = self.stop_signal
if graceful_timeout is None:
graceful_timeout = self.graceful_timeout
if process.stopping:
raise gen.Return(False)
try:
logger.debug("%s: kill process %s", self.name, process.pid)
if self.stop_children:
self.send_signal_process(process, stop_signal)
else:
self.send_signal(process.pid, stop_signal)
self.notify_event("kill", {"process_pid": process.pid,
"time": time.time()})
except NoSuchProcess:
raise gen.Return(False)
process.stopping = True
waited = 0
while waited < graceful_timeout:
if not process.is_alive():
break
yield tornado_sleep(0.1)
waited += 0.1