Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_single_worker_spread_wait(app, mocker):
process_job = mocker.Mock()
wait_for_job = mocker.Mock()
await app.configure_task("bla").defer_async()
class TestWorker(worker.Worker):
stop = False
async def process_job(self, job, worker_id):
process_job(job=job, worker_id=worker_id)
async def wait_for_job(self, timeout):
wait_for_job(timeout)
self.stop_requested = self.stop
self.stop = True
await TestWorker(app=app, timeout=4, concurrency=7).single_worker(worker_id=3)
process_job.assert_called_once()
assert wait_for_job.call_args_list == [mocker.call(4 * (3 + 1)), mocker.call(4 * 7)]
async def test_wait_for_activity_stop_from_signal(aiopg_connector, kill_own_pid):
"""
Testing than ctrl+c interrupts the wait
"""
pg_app = app.App(connector=aiopg_connector)
worker = worker_module.Worker(app=pg_app, timeout=2)
task = asyncio.ensure_future(worker.run())
await asyncio.sleep(0.2) # should be enough so that we're waiting
kill_own_pid()
try:
await asyncio.wait_for(task, timeout=0.2)
except asyncio.TimeoutError:
pytest.fail("Failed to stop worker within .2s")
def test_context_for_worker_value_kept(app):
test_worker = worker.Worker(app=app, name="foo")
expected = job_context.JobContext(app=app, worker_id=3, worker_name="bar")
test_worker.context_for_worker(worker_id=3, worker_name="bar")
context = test_worker.context_for_worker(worker_id=3)
assert context == expected
def test_context_for_worker_kwargs(app):
test_worker = worker.Worker(app=app, name="foo")
expected = job_context.JobContext(app=app, worker_id=3, worker_name="bar")
context = test_worker.context_for_worker(worker_id=3, worker_name="bar")
assert context == expected