Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def delete_old_jobs_run(self, nb_hours, queue, statuses):
for id, job in list(self.jobs.items()):
if (
job["status"] in statuses
and (
max(e["at"] for e in self.events[id])
< utils.utcnow() - datetime.timedelta(hours=nb_hours)
)
and queue in (job["queue_name"], None)
):
self.jobs.pop(id)
self.jobs[id] = job_row = {
"id": id,
"queue_name": queue,
"task_name": task_name,
"lock": lock,
"queueing_lock": queueing_lock,
"args": args,
"status": "todo",
"scheduled_at": scheduled_at,
"attempts": 0,
}
self.events[id] = []
if scheduled_at:
self.events[id].append({"type": "scheduled", "at": scheduled_at})
self.events[id].append({"type": "deferred", "at": utils.utcnow()})
if self.notify_event:
if "procrastinate_any_queue" in self.notify_channels or (
f"procrastinate_queue#{queue}" in self.notify_channels
):
self.notify_event.set()
return job_row
def test_get_retry_exception_returns():
strategy = retry_module.RetryStrategy(max_attempts=10, wait=5.0)
now = utils.utcnow()
expected = now + datetime.timedelta(seconds=5, microseconds=0)
exc = strategy.get_retry_exception(exception=None, attempts=1)
assert isinstance(exc, exceptions.JobRetry)
assert exc.scheduled_at == expected.replace(microsecond=0)
def fetch_job_one(self, queues: Optional[Iterable[str]]) -> Dict:
# Creating a copy of the iterable so that we can modify it while we iterate
for job in self.jobs.values():
if (
job["status"] == "todo"
and (queues is None or job["queue_name"] in queues)
and (not job["scheduled_at"] or job["scheduled_at"] <= utils.utcnow())
and job["lock"] not in self.current_locks
):
job["status"] = "doing"
self.events[job["id"]].append({"type": "started", "at": utils.utcnow()})
return job
return {"id": None}
def finish_job_run(
self, job_id: int, status: str, scheduled_at: Optional[datetime.datetime] = None
) -> None:
job_row = self.jobs[job_id]
job_row["status"] = status
event_type = status
if status == "todo":
job_row["attempts"] += 1
job_row["scheduled_at"] = scheduled_at
if scheduled_at:
self.events[job_id].append({"type": "scheduled", "at": scheduled_at})
event_type = "deferred_for_retry"
self.events[job_id].append({"type": event_type, "at": utils.utcnow()})
def select_stalled_jobs_all(self, nb_seconds, queue, task_name):
return (
job
for job in self.jobs.values()
if job["status"] == "doing"
and self.events[job["id"]][-1]["at"]
< utils.utcnow() - datetime.timedelta(seconds=nb_seconds)
and queue in (job["queue_name"], None)
and task_name in (job["task_name"], None)
)