Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_delete_old_jobs_job_is_not_finished(get_all, pg_job_store):
pg_job_store.defer_job(
jobs.Job(
id=0,
queue="queue_a",
task_name="task_1",
lock="lock_1",
task_kwargs={"a": "b"},
)
)
# No started job
pg_job_store.delete_old_jobs(nb_hours=0)
assert len(get_all("procrastinate_jobs", "id")) == 1
# We start a job
job = pg_job_store.fetch_job(queues=["queue_a"])
# We back date the started event
with pg_job_store.connection.cursor() as cursor:
async def test_run_job_retry(app):
def job(a, b): # pylint: disable=unused-argument
raise ValueError("nope")
task = tasks.Task(job, app=app, queue="yay", name="job", retry=True)
task.func = job
app.tasks = {"job": task}
job = jobs.Job(
id=16,
task_kwargs={"a": 9, "b": 3},
lock="sherlock",
task_name="job",
queueing_lock="houba",
queue="yay",
)
test_worker = worker.Worker(app, queues=["yay"])
with pytest.raises(exceptions.JobRetry):
await test_worker.run_job(job=job, worker_id=3)
def test_finish_job(get_all, pg_job_store):
pg_job_store.defer_job(
jobs.Job(
id=0,
queue="queue_a",
task_name="task_1",
lock="lock_1",
task_kwargs={"a": "b"},
)
)
job = pg_job_store.fetch_job(queues=["queue_a"])
assert get_all("procrastinate_jobs", "status") == [{"status": "doing"}]
started_at = get_all("procrastinate_jobs", "started_at")[0]["started_at"]
assert started_at.date() == datetime.datetime.utcnow().date()
assert get_all("procrastinate_jobs", "attempts") == [{"attempts": 0}]
pg_job_store.finish_job(job=job, status=jobs.Status.SUCCEEDED)
async def test_run_job_concurrency_warning(app, caplog):
# Running a sync task with concurrency > 1 should raise a warning
result = []
caplog.set_level(logging.WARNING)
@app.task(queue="yay", name="job")
def task_func(a):
result.append(a)
job = jobs.Job(
id=16,
task_kwargs={"a": 1},
lock="sherlock",
queueing_lock="houba",
task_name="job",
queue="yay",
)
test_worker = worker.Worker(app, concurrency=2)
await test_worker.run_job(job=job, worker_id=0)
assert result == [1]
assert [(r.action, r.levelname) for r in caplog.records] == [
("concurrent_sync_task", "WARNING")
], caplog.records
def test_get_stalled_jobs(get_all, pg_job_store):
pg_job_store.defer_job(
jobs.Job(
id=0,
queue="queue_a",
task_name="task_1",
lock="lock_1",
task_kwargs={"a": "b"},
)
)
job_id = list(get_all("procrastinate_jobs", "id"))[0]["id"]
# No started job
assert list(pg_job_store.get_stalled_jobs(nb_seconds=3600)) == []
# We start a job and fake its `started_at`
job = pg_job_store.fetch_job(queues=["queue_a"])
with pg_job_store.connection.cursor() as cursor:
cursor.execute(
lock: Optional[str] = None,
queueing_lock: Optional[str] = None,
task_kwargs: Optional[types.JSONDict] = None,
schedule_at: Optional[datetime.datetime] = None,
schedule_in: Optional[Dict[str, int]] = None,
queue: str = jobs.DEFAULT_QUEUE,
) -> jobs.JobDeferrer:
if schedule_at and schedule_in is not None:
raise ValueError("Cannot set both schedule_at and schedule_in")
if schedule_in is not None:
schedule_at = pendulum.now("UTC").add(**schedule_in)
task_kwargs = task_kwargs or {}
return jobs.JobDeferrer(
job=jobs.Job(
id=None,
lock=lock,
queueing_lock=queueing_lock,
task_name=name,
queue=queue,
task_kwargs=task_kwargs,
scheduled_at=schedule_at,
),
job_store=job_store,
)
async def get_stalled_jobs(
self,
nb_seconds: int,
queue: Optional[str] = None,
task_name: Optional[str] = None,
) -> Iterable[jobs.Job]:
rows = await self.connector.execute_query_all_async(
query=sql.queries["select_stalled_jobs"],
nb_seconds=nb_seconds,
queue=queue,
task_name=task_name,
)
return [jobs.Job.from_row(row) for row in rows]