Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_single_worker_stop_during_wait(app, mocker):
process_job = mocker.Mock()
wait_for_job = mocker.Mock()
await app.configure_task("bla").defer_async()
class TestWorker(worker.Worker):
async def process_job(self, job, worker_id):
process_job(job=job, worker_id=worker_id)
async def wait_for_job(self, timeout):
wait_for_job()
self.stop_requested = True
await TestWorker(app=app).single_worker(worker_id=0)
process_job.assert_called_once()
wait_for_job.assert_called_once()
async def test_wait_for_activity_stop(aiopg_connector):
"""
Testing than calling job_store.stop() interrupts the wait
"""
pg_app = app.App(connector=aiopg_connector)
worker = worker_module.Worker(app=pg_app, timeout=2)
task = asyncio.ensure_future(worker.run())
await asyncio.sleep(0.2) # should be enough so that we're waiting
worker.stop()
try:
await asyncio.wait_for(task, timeout=0.2)
except asyncio.TimeoutError:
pytest.fail("Failed to stop worker within .2s")
def test_worker(app):
return worker.Worker(app=app, queues=["yay"])
async def test_run_no_listen_notify(app):
running_worker = worker.Worker(app=app, queues=["some_queue"], listen_notify=False)
task = asyncio.ensure_future(running_worker.run())
try:
await asyncio.sleep(0.01)
assert app.connector.notify_event is None
finally:
running_worker.stop()
await asyncio.wait_for(task, timeout=0.5)
async def running_worker(app):
running_worker = worker.Worker(app=app, queues=["some_queue"])
task = asyncio.ensure_future(running_worker.run())
running_worker.task = task
yield running_worker
running_worker.stop()
await asyncio.wait_for(task, timeout=0.5)
def test_worker_load_task_unknown_task(app, caplog):
global unknown_task
test_worker = worker.Worker(app=app, queues=["yay"])
@app.task
def task_func():
pass
unknown_task = task_func
assert (
test_worker.load_task("tests.unit.test_worker_sync.unknown_task", worker_id=2)
== task_func
)
assert [record for record in caplog.records if record.action == "load_dynamic_task"]
async def test_wait_for_activity(aiopg_connector):
"""
Testing that a new event interrupts the wait
"""
pg_app = app.App(connector=aiopg_connector)
worker = worker_module.Worker(app=pg_app, timeout=2)
worker.notify_event = asyncio.Event()
task = asyncio.ensure_future(worker.single_worker(worker_id=0))
await asyncio.sleep(0.2) # should be enough so that we're waiting
worker.stop_requested = True
worker.notify_event.set()
try:
await asyncio.wait_for(task, timeout=0.2)
except asyncio.TimeoutError:
pytest.fail("Failed to stop worker within .2s")
def _worker(self, **kwargs) -> "worker.Worker":
from procrastinate import worker
final_kwargs = {**self.worker_defaults, **kwargs}
return worker.Worker(app=self, **final_kwargs)
@click.option("-n", "--name", default=worker.WORKER_NAME, help="Name of the worker")
@click.option(
"-q",
"--queues",
default="",
help="Comma-separated names of the queues to listen "
"to (empty string for all queues)",
)
@click.option(
"-c",
"--concurrency",
type=int,
default=worker.WORKER_CONCURRENCY,
help="Number of parallel asynchronous jobs to process at once",
)
@click.option(
"-t",
default=worker.WORKER_TIMEOUT,
help="How long to wait for database event push before polling",
)
@click.option(
"-w",
"--wait/--one-shot",
default=True,
help="When all jobs have been processed, whether to "
"terminate or to wait for new jobs",
)
@click.option(
"-w",
"--listen-notify/--no-listen-notify",
default=True,
help="Whether to actively listen for new jobs or periodically poll",
)
@handle_errors()