Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ffpuppet_26(tmp_path):
"""test multiprocess target"""
prefs = (tmp_path / "prefs.js")
prefs.write_bytes(b"//fftest_multi_proc\n")
with FFPuppet() as ffp:
with HTTPTestServer() as srv:
ffp.launch(TESTFF_BIN, prefs_js=str(prefs), location=srv.get_addr())
assert ffp.is_running()
assert not ffp.wait(timeout=0)
c_procs = Process(ffp.get_pid()).children()
assert c_procs
# terminate one of the child processes
c_procs[-1].terminate()
assert ffp.is_running()
ffp.close()
assert not ffp.is_running()
assert ffp.wait(timeout=0)
def test_ffpuppet_01():
"""test basic launch and close"""
with FFPuppet() as ffp:
assert ffp.launches == 0
assert ffp.reason == ffp.RC_CLOSED
with HTTPTestServer() as srv:
ffp.launch(TESTFF_BIN, location=srv.get_addr())
assert not ffp._checks
assert ffp.launches == 1
assert not ffp.wait(timeout=0)
assert ffp.is_running()
assert ffp.is_healthy()
assert ffp.reason is None
ffp.close()
assert ffp.reason == ffp.RC_CLOSED
assert ffp._proc is None
assert not ffp.is_running()
assert not ffp.is_healthy()
assert ffp.wait(timeout=10)
def test_ffpuppet_33():
"""test cpu_usage()"""
with FFPuppet() as ffp:
assert not any(ffp.cpu_usage())
with HTTPTestServer() as srv:
ffp.launch(TESTFF_BIN, location=srv.get_addr())
usage = tuple(ffp.cpu_usage())
assert len(usage) == 1
assert len(usage[0]) == 2
assert usage[0][0] == ffp.get_pid()
assert usage[0][1] <= 100
assert usage[0][1] >= 0
ffp.close()
assert ffp.wait(timeout=10)
def test_ffpuppet_08(tmp_path):
"""test clone_log()"""
logs = (tmp_path / "logs.txt")
with FFPuppet() as ffp:
assert ffp.clone_log("stdout", target_file=str(logs)) is None
with HTTPTestServer() as srv:
ffp.launch(TESTFF_BIN, location=srv.get_addr())
ffp.wait(timeout=0.25) # wait for log prints
# make sure logs are available
assert ffp.clone_log("stdout", target_file=str(logs)) == str(logs)
orig = logs.read_text()
assert len(orig) > 5
assert ffp.clone_log("stdout", target_file=str(logs), offset=5) == str(logs)
assert logs.read_text() == orig[5:]
# grab log without giving a target file name
rnd_log = ffp.clone_log("stdout")
assert rnd_log is not None
try:
ffp.close()
# make sure logs are available
assert ffp.clone_log("stdout", target_file=str(logs)) == str(logs)
assert logs.read_text().startswith(orig)
def test_ffpuppet_13():
"""test calling close() and clean_up() in multiple states"""
with FFPuppet() as ffp:
ffp.close()
with HTTPTestServer() as srv:
ffp.launch(TESTFF_BIN, location=srv.get_addr())
assert ffp.reason is None
ffp.close()
ffp.clean_up()
with pytest.raises(AssertionError):
ffp.launch(TESTFF_BIN, location=srv.get_addr())
with pytest.raises(AssertionError):
ffp.close()
def __init__(self, handler=None):
self._handler = handler if handler is not None else ReqHandler
while True:
try:
self._httpd = HTTPServer(("127.0.0.1", 0), self._handler)
except socket.error as soc_e:
if soc_e.errno in (errno.EADDRINUSE, 10013): # Address already in use
continue
raise
break
self._thread = threading.Thread(target=HTTPTestServer._srv_thread, args=(self._httpd,))
self._thread.start()
def test_ffpuppet_22():
"""test running multiple instances in parallel"""
ffps = list()
try:
with HTTPTestServer() as srv:
# use test pool size of 10
for _ in range(10):
ffps.append(FFPuppet())
# NOTE: launching truly in parallel can DoS the test webserver
ffps[-1].launch(TESTFF_BIN, location=srv.get_addr())
# list of ffps needs to be reversed to deal with inheriting open file handles in Popen
# this is not a problem in production only in the test environment
for ffp in reversed(ffps):
assert ffp.launches == 1
ffp.close()
finally:
for ffp in ffps:
ffp.clean_up()
def test_ffpuppet_05():
"""test get_pid()"""
with FFPuppet() as ffp:
assert ffp.get_pid() is None
with HTTPTestServer() as srv:
ffp.launch(TESTFF_BIN, location=srv.get_addr())
assert ffp.get_pid() > 0
ffp.close()
assert ffp.get_pid() is None
def test_ffpuppet_09(tmp_path):
"""test hitting memory limit"""
with FFPuppet() as ffp:
prefs = (tmp_path / "prefs.js")
prefs.write_bytes(b"//fftest_memory\n")
with HTTPTestServer() as srv:
# launch with 1MB memory limit
ffp.launch(TESTFF_BIN, location=srv.get_addr(), prefs_js=str(prefs), memory_limit=0x100000)
for _ in range(100):
if not ffp.is_healthy():
break
time.sleep(0.1)
ffp.close()
assert ffp.reason == ffp.RC_WORKER
assert len(ffp.available_logs()) == 3
logs = (tmp_path / "logs")
ffp.save_logs(str(logs))
worker_log = (logs / "log_ffp_worker_memory_usage.txt")
assert worker_log.is_file()
assert "MEMORY_LIMIT_EXCEEDED" in worker_log.read_text()
def test_ffpuppet_11(tmp_path):
"""test abort tokens"""
prefs = (tmp_path / "prefs.js")
prefs.write_bytes(b"//fftest_soft_assert\n")
with FFPuppet() as ffp:
ffp.add_abort_token(r"TEST\dREGEX\.+")
ffp.add_abort_token("simple_string")
with pytest.raises(AssertionError):
ffp.add_abort_token(None)
ffp.add_abort_token(r"ASSERTION:\s\w+")
with HTTPTestServer() as srv:
ffp.launch(TESTFF_BIN, location=srv.get_addr(), prefs_js=str(prefs))
for _ in range(200):
if not ffp.is_healthy():
break
time.sleep(0.05)
ffp.close()
assert ffp.reason == ffp.RC_WORKER
assert len(ffp.available_logs()) == 3
logs = (tmp_path / "logs")
ffp.save_logs(str(logs))
worker_log = (logs / "log_ffp_worker_log_contents.txt")
assert worker_log.is_file()
assert b"TOKEN_LOCATED: ASSERTION: test" in worker_log.read_bytes()