Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_start_stop():
worker = Worker(1, [])
assert worker._started is True
await asyncio.sleep(10)
worker.stop()
assert worker._started is False
def test_child_start_setgid_fails_permissions():
cfile = create_config(
("user=fgqewgreghrehgerhehw", "group=fgqewgreghrehgerhehw")
)
Config(cfile).load()
with mock.patch("os.pipe", return_value=("", "")), mock.patch(
"os.fork", return_value=False
), mock.patch("os.close"), mock.patch(
"os.setgid", side_effect=PermissionError
), pytest.raises(
SystemExit
) as exc:
Worker([], [])
assert exc.value.code == 64
def test_child_start_setuid_fails_permissions():
cfile = create_config(
("user=fgqewgreghrehgerhehw", "group=fgqewgreghrehgerhehw")
)
Config(cfile).load()
with mock.patch("os.pipe", return_value=("", "")), mock.patch(
"os.fork", return_value=False
), mock.patch("os.close"), mock.patch(
"os.setuid", side_effect=PermissionError
), pytest.raises(
SystemExit
) as exc:
Worker([], [])
assert exc.value.code == 64
async def test_restart(unused_tcp_port):
aserver = server("127.0.0.1", unused_tcp_port, socket.AF_INET)
started = time.monotonic()
worker = Worker("1", [aserver])
assert worker._started is True
await asyncio.sleep(25)
worker.ping = time.monotonic() - 120
old_pid = worker.pid
await asyncio.sleep(15)
assert worker.pid is not old_pid
worker.stop()
assert worker._started is False
assert worker.ping > started
assert worker.ping_count == 0
aserver["sock"].close()
def test_child_start_setuid_fails_invalid_user():
cfile = create_config(
("user=fgqewgreghrehgerhehw", "group=fgqewgreghrehgerhehw")
)
Config(cfile).load()
with mock.patch("os.pipe", return_value=("", "")), mock.patch(
"os.fork", return_value=False
), mock.patch("os.close"), mock.patch(
"os.setuid", side_effect=KeyError
), pytest.raises(
SystemExit
) as exc:
Worker([], [])
assert exc.value.code == 64
def test_child_start_setgid_fails_invalid_group():
cfile = create_config(
("user=fgqewgreghrehgerhehw", "group=fgqewgreghrehgerhehw")
)
Config(cfile).load()
with mock.patch("os.pipe", return_value=("", "")), mock.patch(
"os.fork", return_value=False
), mock.patch("os.close"), mock.patch(
"os.setgid", side_effect=KeyError
), pytest.raises(
SystemExit
) as exc:
Worker([], [])
assert exc.value.code == 64
async def test_worker_ping_pong(unused_tcp_port):
aserver = server("127.0.0.1", unused_tcp_port, socket.AF_INET)
started = time.monotonic()
worker = Worker("1", [aserver])
assert worker._started is True
await asyncio.sleep(35)
worker.stop()
assert worker._started is False
assert worker.ping > started
assert worker.ping_count == 2
aserver["sock"].close()
def start_workers(self):
"""Start each worker and it's child process."""
logger.debug("Starting workers")
for idx in range(self.config.workers):
num = f"{idx + 1}"
logger.debug(f"Creating worker: {num}")
self.workers.append(Worker(num, self.socks, self.loop))