Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
and `quiet` attributes, corresponding to the parameters to the constructor
of `honcho.process.Process`.
"""
if env is not None and env.get("PORT") is not None:
port = int(env.get("PORT"))
if quiet is None:
quiet = []
con = defaultdict(lambda: 1)
if concurrency is not None:
con.update(concurrency)
out = []
for name, cmd in compat.iteritems(processes):
for i in range(con[name]):
n = "{0}.{1}".format(name, i + 1)
c = cmd
q = name in quiet
e = {'HONCHO_PROCESS_NAME': n}
if env is not None:
e.update(env)
if port is not None:
e['PORT'] = str(port + i)
params = ProcessParams(n, c, q, e)
out.append(params)
if port is not None:
port += 100
return out
def _killall(self, force=False):
"""Kill all remaining processes, forcefully if requested."""
for_termination = []
for n, p in iteritems(self._processes):
if 'returncode' not in p:
for_termination.append(n)
for n in for_termination:
p = self._processes[n]
signame = 'SIGKILL' if force else 'SIGTERM'
self._system_print("sending %s to %s (pid %s)\n" %
(signame, n, p['pid']))
if sys.platform == 'win32':
import ctypes
ctypes.windll.kernel32.SetConsoleCtrlHandler(0, True);
ctypes.windll.kernel32.GenerateConsoleCtrlEvent(0, 0)
time.sleep(0.1)
ctypes.windll.kernel32.SetConsoleCtrlHandler(0, False);
else:
if force:
def _any_stopped(self):
return any(p.get('returncode') is not None for _, p in iteritems(self._processes))
def _killall(self, force=False):
"""Kill all remaining processes, forcefully if requested."""
for_termination = []
for n, p in iteritems(self._processes):
if 'returncode' not in p:
for_termination.append(n)
for n in for_termination:
p = self._processes[n]
signame = 'SIGKILL' if force else 'SIGTERM'
self._system_print("sending %s to %s (pid %s)\n" %
(signame, n, p['pid']))
if force:
self._env.kill(p['pid'])
else:
self._env.terminate(p['pid'])
def _all_started(self):
return all(p.get('pid') is not None for _, p in iteritems(self._processes))
def _all_stopped(self):
return all(p.get('returncode') is not None for _, p in iteritems(self._processes))