Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_get_status_count(checker):
status_count = await checker.get_status_count_async()
assert set(status_count.keys()) == set(jobs.Status)
for known_status in jobs.Status:
assert status_count[known_status] == 0
async def test_finish_job(job_store, job_factory, connector):
job = job_factory(id=1)
await job_store.defer_job_async(job=job)
retry_at = pendulum.datetime(2000, 1, 1)
await job_store.finish_job(job=job, status=jobs.Status.TODO, scheduled_at=retry_at)
assert connector.queries[-1] == (
"finish_job",
{"job_id": 1, "scheduled_at": retry_at, "status": "todo"},
)
(jobs.Status.SUCCEEDED, 1, "queue_b", False, False),
(jobs.Status.SUCCEEDED, 1, "queue_b", False, False),
# include_error
(jobs.Status.FAILED, 1, None, False, False),
(jobs.Status.FAILED, 1, None, True, True),
],
)
def test_delete_old_jobs_parameters(
get_all, pg_job_store, status, nb_hours, queue, include_error, should_delete
):
pg_job_store.defer_job(
jobs.Job(
id=0,
queue="queue_a",
task_name="task_1",
lock="lock_1",
task_kwargs={"a": "b"},
async def test_delete_old_jobs_multiple_jobs(
get_all, pg_job_store, aiopg_connector, job_factory
):
await pg_job_store.defer_job_async(job_factory(queue="queue_a"))
await pg_job_store.defer_job_async(job_factory(queue="queue_b"))
# We start both jobs
job_a = await pg_job_store.fetch_job(queues=["queue_a"])
job_b = await pg_job_store.fetch_job(queues=["queue_b"])
# We finish both jobs
await pg_job_store.finish_job(job_a, status=jobs.Status.SUCCEEDED)
await pg_job_store.finish_job(job_b, status=jobs.Status.SUCCEEDED)
# We back date the events for job_a
await aiopg_connector.execute_query_async(
f"UPDATE procrastinate_events SET at=at - INTERVAL '2 hours'"
f"WHERE job_id={job_a.id}"
)
# Only job_a is deleted
await pg_job_store.delete_old_jobs(nb_hours=2)
rows = await get_all("procrastinate_jobs", "id")
assert len(rows) == 1
assert rows[0]["id"] == job_b.id
)
pg_job_store.defer_job(
jobs.Job(
id=0,
queue="queue_b",
task_name="task_2",
lock="lock_2",
task_kwargs={"a": "b"},
)
)
# We start both jobs
job_a = pg_job_store.fetch_job(queues=["queue_a"])
job_b = pg_job_store.fetch_job(queues=["queue_b"])
# We finish both jobs
pg_job_store.finish_job(job_a, status=jobs.Status.SUCCEEDED)
pg_job_store.finish_job(job_b, status=jobs.Status.SUCCEEDED)
# We back date the events for job_a
with pg_job_store.connection.cursor() as cursor:
cursor.execute(
f"UPDATE procrastinate_events SET at=at - INTERVAL '2 hours'"
f"WHERE job_id={job_a.id}"
)
# Only job_a is deleted
pg_job_store.delete_old_jobs(nb_hours=2)
rows = get_all("procrastinate_jobs", "id")
assert len(rows) == 1
assert rows[0]["id"] == job_b.id
(jobs.Status.SUCCEEDED, 1, None, False, True),
(jobs.Status.SUCCEEDED, 3, None, False, False),
# queue
(jobs.Status.SUCCEEDED, 1, "queue_a", False, True),
(jobs.Status.SUCCEEDED, 3, "queue_a", False, False),
(jobs.Status.SUCCEEDED, 1, "queue_b", False, False),
(jobs.Status.SUCCEEDED, 1, "queue_b", False, False),
# include_error
(jobs.Status.FAILED, 1, None, False, False),
(jobs.Status.FAILED, 1, None, True, True),
],
)
async def test_delete_old_jobs_parameters(
get_all,
pg_job_store,
aiopg_connector,
status,
def test_healthchecks(entrypoint, click_app, mocker):
check_db = mocker.patch(
"procrastinate.healthchecks.HealthCheckRunner.check_connection"
)
check_db.return_value = True
count_jobs = mocker.patch(
"procrastinate.healthchecks.HealthCheckRunner.get_status_count"
)
count_jobs.return_value = {jobs.Status.SUCCEEDED: 42}
result = entrypoint("-a yay healthchecks")
assert result.output.startswith("DB connection: OK")
check_db.assert_called_once_with()
count_jobs.assert_called_once_with()
async def process_job(self, job: jobs.Job, worker_id: int = 0) -> None:
context = self.context_for_worker(worker_id=worker_id, job=job)
self.logger.debug(
f"Loaded job info, about to start job {job.call_string}",
extra=context.log_extra(action="loaded_job_info"),
)
status = jobs.Status.FAILED
next_attempt_scheduled_at = None
try:
await self.run_job(job=job, worker_id=worker_id)
status = jobs.Status.SUCCEEDED
except exceptions.JobRetry as e:
status = jobs.Status.TODO
next_attempt_scheduled_at = e.scheduled_at
except exceptions.JobError:
pass
except exceptions.TaskNotFound as exc:
self.logger.exception(
f"Task was not found: {exc}",
extra=context.log_extra(action="task_not_found", exception=str(exc)),
)
finally:
await self.job_store.finish_job(
job=job, status=status, scheduled_at=next_attempt_scheduled_at
)
self.logger.debug(
async def delete_old_jobs(
self,
nb_hours: int,
queue: Optional[str] = None,
include_error: Optional[bool] = False,
) -> None:
# We only consider finished jobs by default
if not include_error:
statuses = [jobs.Status.SUCCEEDED.value]
else:
statuses = [jobs.Status.SUCCEEDED.value, jobs.Status.FAILED.value]
await self.connector.execute_query_async(
query=sql.queries["delete_old_jobs"],
nb_hours=nb_hours,
queue=queue,
statuses=tuple(statuses),
)
async def delete_old_jobs(
self,
nb_hours: int,
queue: Optional[str] = None,
include_error: Optional[bool] = False,
) -> None:
# We only consider finished jobs by default
if not include_error:
statuses = [jobs.Status.SUCCEEDED.value]
else:
statuses = [jobs.Status.SUCCEEDED.value, jobs.Status.FAILED.value]
await self.connector.execute_query_async(
query=sql.queries["delete_old_jobs"],
nb_hours=nb_hours,
queue=queue,
statuses=tuple(statuses),
)