Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_fail_if_other_executor_running():
"""Test raising AlreadyRunning exception."""
executor = TCPExecutor(HTTP_SERVER, host='localhost', port=PORT)
executor2 = TCPExecutor(HTTP_SERVER, host='localhost', port=PORT)
with executor:
assert executor.running() is True
with pytest.raises(AlreadyRunning):
executor2.start()
with pytest.raises(AlreadyRunning):
with executor2:
pass
def test_fail_if_other_executor_running():
"""Test raising AlreadyRunning exception."""
executor = TCPExecutor(HTTP_SERVER, host='localhost', port=PORT)
executor2 = TCPExecutor(HTTP_SERVER, host='localhost', port=PORT)
with executor:
assert executor.running() is True
with pytest.raises(AlreadyRunning):
executor2.start()
with pytest.raises(AlreadyRunning):
with executor2:
pass
def test_fail_if_other_running():
"""Test raising AlreadyRunning exception when port is blocked."""
executor = HTTPExecutor(
HTTP_NORMAL_CMD, 'http://{0}:{1}/'.format(HOST, PORT),
)
executor2 = HTTPExecutor(
HTTP_NORMAL_CMD, 'http://{0}:{1}/'.format(HOST, PORT),
)
with executor:
assert executor.running() is True
with pytest.raises(AlreadyRunning):
executor2.start()
with pytest.raises(AlreadyRunning) as exc:
with executor2:
pass
assert 'seems to be already running' in str(exc.value)
"""Test raising AlreadyRunning exception when port is blocked."""
executor = HTTPExecutor(
HTTP_NORMAL_CMD, 'http://{0}:{1}/'.format(HOST, PORT),
)
executor2 = HTTPExecutor(
HTTP_NORMAL_CMD, 'http://{0}:{1}/'.format(HOST, PORT),
)
with executor:
assert executor.running() is True
with pytest.raises(AlreadyRunning):
executor2.start()
with pytest.raises(AlreadyRunning) as exc:
with executor2:
pass
assert 'seems to be already running' in str(exc.value)
def test_fail_if_other_executor_running():
"""Test raising AlreadyRunning exception when port is blocked."""
process = f'bash -c "sleep 2 && touch {FILENAME} && sleep 10"'
executor = PidExecutor(process, FILENAME)
executor2 = PidExecutor(process, FILENAME)
with executor:
assert executor.running() is True
with pytest.raises(AlreadyRunning):
executor2.start()
def start(self, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
""" Merged copy paste from the inheritance chain with modified stdout/err behaviour """
if self.pre_start_check():
# Some other executor (or process) is running with same config:
raise AlreadyRunning(self)
if self.process is None:
command = self.command
if not self._shell:
command = self.command_parts
env = os.environ.copy()
env[ENV_UUID] = self._uuid
popen_kwargs = {
"shell": self._shell,
"stdin": subprocess.PIPE,
"stdout": stdout,
"stderr": stderr,
"universal_newlines": True,
"env": env,
}