Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_helpers_08():
"""test get_processes()"""
assert len(get_processes(os.getpid(), recursive=False)) == 1
assert not get_processes(0xFFFFFF)
is_alive = multiprocessing.Event()
is_done = multiprocessing.Event()
proc = multiprocessing.Process(target=_dummy_process, args=(is_alive, is_done))
proc.start()
try:
is_alive.wait(30)
assert len(get_processes(os.getpid())) > 1
finally:
is_done.set()
proc.join()
def test_helpers_08():
"""test get_processes()"""
assert len(get_processes(os.getpid(), recursive=False)) == 1
assert not get_processes(0xFFFFFF)
is_alive = multiprocessing.Event()
is_done = multiprocessing.Event()
proc = multiprocessing.Process(target=_dummy_process, args=(is_alive, is_done))
proc.start()
try:
is_alive.wait(30)
assert len(get_processes(os.getpid())) > 1
finally:
is_done.set()
proc.join()
def test_helpers_08():
"""test get_processes()"""
assert len(get_processes(os.getpid(), recursive=False)) == 1
assert not get_processes(0xFFFFFF)
is_alive = multiprocessing.Event()
is_done = multiprocessing.Event()
proc = multiprocessing.Process(target=_dummy_process, args=(is_alive, is_done))
proc.start()
try:
is_alive.wait(30)
assert len(get_processes(os.getpid())) > 1
finally:
is_done.set()
proc.join()
def test_ffpuppet_27(tmp_path):
"""test multiprocess (target terminated)"""
prefs = (tmp_path / "prefs.js")
prefs.write_bytes(b"//fftest_multi_proc\n")
with FFPuppet() as ffp:
with HTTPTestServer() as srv:
ffp.launch(TESTFF_BIN, prefs_js=str(prefs), location=srv.get_addr())
assert ffp.is_running()
procs = get_processes(ffp.get_pid())
# when running the browser the children exit if the parent disappears
# since the first item in procs is the parent iterate over the list
# calling terminate()
for proc in procs:
try:
proc.terminate()
except (AccessDenied, NoSuchProcess):
pass
assert not wait_procs(procs, timeout=10)[1]
ffp.close()
assert not ffp.is_running()
assert ffp.wait(timeout=0)
def wait(self, timeout=None):
"""
Wait for process and children to terminate. This call will block until the processes exit
unless a timeout is specified. If a timeout of zero or greater is specified the call will
only block until the timeout expires.
@type timeout: float, int or None
@param timeout: maximum amount of time to wait for process to terminate
or None (wait indefinitely)
@rtype: bool
@return: True if processes exit before timeout expires otherwise False
"""
assert timeout is None or timeout >= 0
pid = self.get_pid()
if pid is None or not psutil.wait_procs(get_processes(pid), timeout=timeout)[1]:
return True
log.debug("wait(timeout=%0.2f) timed out", timeout)
return False
def check(self):
"""
Use psutil to collect memory usage info and compare with limit.
@rtype: bool
@return: True if the total memory usage is greater than or equal to
self.limit otherwise False.
"""
procs = get_processes(self.pid)
proc_info = list()
total_usage = 0
for proc in procs:
try:
cur_rss = proc.memory_info().rss
total_usage += cur_rss
proc_info.append((proc.pid, cur_rss))
except (AccessDenied, NoSuchProcess): # pragma: no cover
pass
if total_usage >= self.limit:
msg = [
"MEMORY_LIMIT_EXCEEDED: %s\n" % format(total_usage, ","),
"Limit: %s (%dMB)\n" % (format(self.limit, ","), self.limit/1048576),
"Parent PID: %d\n" % self.pid]
for pid, usage in proc_info:
msg.append("-> PID %6d: %s\n" % (pid, format(usage, "14,")))
def _terminate(pid, kill_delay=30):
log.debug("_terminate(%d, kill_delay=%0.2f)", pid, kill_delay)
procs = get_processes(pid)
mode = 0
while mode < 2:
log.debug("%d running process(es)", len(procs))
# iterate over and terminate/kill processes
for proc in procs:
try:
proc.kill() if mode > 0 else proc.terminate()
except (psutil.AccessDenied, psutil.NoSuchProcess):
pass
procs = psutil.wait_procs(procs, timeout=kill_delay)[1]
if not procs:
log.debug("_terminate() was successful")
break
log.debug("timed out (%0.2f), mode %d", kill_delay, mode)
mode += 1
else:
def cpu_usage(self):
"""
Collect percentage of CPU usage per process.
@rtype: Yields a tuple for each process
@return: PID of the process and the CPU usage as a percentage.
"""
pid = self.get_pid()
if pid is not None:
for proc in get_processes(pid):
try:
yield proc.pid, proc.cpu_percent(interval=0.1)
except (psutil.AccessDenied, psutil.NoSuchProcess): # pragma: no cover
continue