Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_app_register_queue_already_exists(app):
app.queues.add("queue")
task = tasks.Task(task_func, app=app, queue="queue", name="bla")
app._register(task)
assert app.queues == {"queue", "builtin"}
assert "bla" in app.tasks
assert app.tasks["bla"] == task
def test_task_get_retry_exception_none(app):
task = tasks.Task(task_func, app=app, queue="queue")
job = task.configure().job
assert task.get_retry_exception(exception=None, job=job) is None
async def test_task_defer_async(app, connector):
task = tasks.Task(task_func, app=app, queue="queue")
await task.defer_async(c=3)
# The lock is the only thing we can't predict
lock = connector.jobs[1]["lock"]
assert connector.jobs == {
1: {
"id": 1,
"queue_name": "queue",
"task_name": "tests.unit.test_tasks.task_func",
"lock": lock,
"queueing_lock": None,
"args": {"c": 3},
"status": "todo",
"scheduled_at": None,
"attempts": 0,
async def test_run_job_error(app):
def job(a, b): # pylint: disable=unused-argument
raise ValueError("nope")
task = tasks.Task(job, app=app, queue="yay", name="job")
task.func = job
app.tasks = {"job": task}
job = jobs.Job(
id=16,
task_kwargs={"a": 9, "b": 3},
lock="sherlock",
queueing_lock="houba",
task_name="job",
queue="yay",
)
test_worker = worker.Worker(app, queues=["yay"])
with pytest.raises(exceptions.JobError):
await test_worker.run_job(job=job, worker_id=3)
def test_task_get_retry_exception(app, mocker):
mock = mocker.patch("procrastinate.retry.RetryStrategy.get_retry_exception")
task = tasks.Task(task_func, app=app, queue="queue", retry=10)
job = task.configure().job
exception = ValueError()
assert task.get_retry_exception(exception=exception, job=job) is mock.return_value
mock.assert_called_with(exception=exception, attempts=0)
def test_task_configure(app):
task = tasks.Task(task_func, app=app, queue="queue")
job = task.configure(lock="sher", task_kwargs={"yay": "ho"}).job
assert job.task_name == "tests.unit.test_tasks.task_func"
assert job.lock == "sher"
assert job.task_kwargs == {"yay": "ho"}
assert job.queue == "queue"