Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_executor_dont_start():
"""Executor should not start."""
command = 'bash -c "sleep 2 && echo foo && echo bar && sleep 100"'
executor = OutputExecutor(command, 'foobar', timeout=3)
with pytest.raises(TimeoutExpired):
executor.start()
assert executor.running() is False
def test_executor_ignores_processes_exiting_with_0():
"""
Test process exit detection.
Subprocess exiting with zero should be tolerated in order to support
double-forking applications.
"""
# We execute a process that will return zero. In order to give the process
# enough time to return we keep the polling loop spinning for a second.
executor = Executor(['bash', '-c', 'exit 0'], timeout=1.0)
executor.pre_start_check = mock.Mock(return_value=False) # type: ignore
executor.after_start_check = mock.Mock(return_value=False) # type: ignore
with pytest.raises(TimeoutExpired):
# We keep the post-checks spinning forever so it eventually times out.
executor.start()
# Both checks should have been called.
assert executor.pre_start_check.called is True # type: ignore
assert executor.after_start_check.called is True # type: ignore
.. note::
When gathering coverage for the subprocess in tests,
you have to allow subprocesses to end gracefully.
"""
if self.process is not None and self.process.poll() is None:
os.killpg(self.process.pid, signal.SIGTERM)
def process_stopped():
return self.running() is False
# set 10 seconds wait no matter what to kill the process
self._set_timeout(10)
try:
self.wait_for(process_stopped)
except TimeoutExpired:
# at this moment, process got killed,
pass
self._clear_process()
"""
Wait for callback to return True.
Simply returns if wait_for condition has been met,
raises TimeoutExpired otherwise and kills the process.
:param callback wait_for: callback to call
:raises: mirakuru.exceptions.TimeoutExpired
"""
while self.check_timeout():
if wait_for():
return
time.sleep(self._sleep)
self.kill()
raise TimeoutExpired(
self, timeout=self._timeout
)
Simply returns if wait_for condition has been met,
raises TimeoutExpired otherwise and kills the process.
:param callback wait_for: callback to call
:raises: mirakuru.exceptions.TimeoutExpired
:returns: itself
:rtype: SimpleExecutor
"""
while self.check_timeout():
if wait_for():
return self
time.sleep(self._sleep)
self.kill()
raise TimeoutExpired(self, timeout=self._timeout)
command=self.command,
until=self._endtime,
now=time.time(),
)
if wait_for():
log.debug(
"Executor process: waiting ended",
command=self.command,
until=self._endtime,
now=time.time(),
)
return self
time.sleep(self._sleep)
self.kill()
raise TimeoutExpired(self, timeout=self._timeout)