Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
q = Queue()
self.assertEqual(q.count, 0)
# Action
job = q.enqueue(div_by_zero)
self.assertEqual(q.count, 1)
# keep for later
enqueued_at_date = str(job.enqueued_at)
w = Worker([q])
w.work(burst=True) # should silently pass
# Postconditions
self.assertEqual(q.count, 0)
failed_job_registry = FailedJobRegistry(queue=q)
self.assertTrue(job in failed_job_registry)
self.assertEqual(w.get_current_job_id(), None)
# Check the job
job = Job.fetch(job.id)
self.assertEqual(job.origin, q.name)
# Should be the original enqueued_at date, not the date of enqueueing
# to the failed queue
self.assertEqual(str(job.enqueued_at), enqueued_at_date)
self.assertTrue(job.exc_info) # should contain exc_info
def test_requeue(self):
"""FailedJobRegistry.requeue works properly"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(div_by_zero, failure_ttl=5)
worker = Worker([queue])
worker.work(burst=True)
registry = FailedJobRegistry(connection=worker.connection)
self.assertTrue(job in registry)
registry.requeue(job.id)
self.assertFalse(job in registry)
self.assertIn(job.id, queue.get_job_ids())
job.refresh()
self.assertEqual(job.get_status(), JobStatus.QUEUED)
worker.work(burst=True)
self.assertTrue(job in registry)
# Should also work with job instance
registry.requeue(job)
self.assertFalse(job in registry)
self.assertIn(job.id, queue.get_job_ids())
def requeue(queue, all, job_ids):
"""Requeue failed jobs."""
failed_job_registry = FailedJobRegistry(queue, connection=connection)
if all:
job_ids = failed_job_registry.get_job_ids()
if not job_ids:
click.echo("Nothing to do")
sys.exit(0)
click.echo(f"Requeueing {len(job_ids)} jobs from failed queue")
fail_count = 0
for job_id in job_ids:
try:
failed_job_registry.requeue(job_id)
except InvalidJobOperationError:
fail_count += 1
if fail_count > 0:
def clean_registries(queue):
"""Cleans StartedJobRegistry and FinishedJobRegistry of a queue."""
registry = FinishedJobRegistry(name=queue.name,
connection=queue.connection,
job_class=queue.job_class)
registry.cleanup()
registry = StartedJobRegistry(name=queue.name,
connection=queue.connection,
job_class=queue.job_class)
registry.cleanup()
registry = FailedJobRegistry(name=queue.name,
connection=queue.connection,
job_class=queue.job_class)
registry.cleanup()
3. Setting the workers current job to None
4. Add the job to FailedJobRegistry
"""
self.log.debug('Handling failed execution of job %s', job.id)
with self.connection.pipeline() as pipeline:
if started_job_registry is None:
started_job_registry = StartedJobRegistry(
job.origin,
self.connection,
job_class=self.job_class
)
job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
if not self.disable_default_exception_handler:
failed_job_registry = FailedJobRegistry(job.origin, job.connection,
job_class=self.job_class)
failed_job_registry.add(job, ttl=job.failure_ttl,
exc_string=exc_string, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline)
self.increment_failed_job_count(pipeline)
if job.started_at and job.ended_at:
self.increment_total_working_time(
job.ended_at - job.started_at,
pipeline
)
try:
pipeline.execute()
except Exception:
# Ensure that custom exception handlers are called
def get_queue_registry_jobs_count(queue_name, registry_name, offset, per_page):
queue = Queue(queue_name)
if registry_name != "queued":
if per_page >= 0:
per_page = offset + (per_page - 1)
if registry_name == "failed":
current_queue = FailedJobRegistry(queue_name)
elif registry_name == "deferred":
current_queue = DeferredJobRegistry(queue_name)
elif registry_name == "started":
current_queue = StartedJobRegistry(queue_name)
elif registry_name == "finished":
current_queue = FinishedJobRegistry(queue_name)
else:
current_queue = queue
total_items = current_queue.count
job_ids = current_queue.get_job_ids(offset, per_page)
current_queue_jobs = [queue.fetch_job(job_id) for job_id in job_ids]
jobs = [serialize_job(job) for job in current_queue_jobs]
return (total_items, jobs)
def clear_failed_registry(queue):
failed_job_registry = FailedJobRegistry(queue, connection=connection)
job_ids = failed_job_registry.get_job_ids()
for job_id in job_ids:
failed_job_registry.remove(job_id, delete_job=True)
print(f"Deleted {len(job_ids)} jobs from the failed job registry.")
def empty_queue(queue_name, registry_name):
if registry_name == "queued":
q = Queue(queue_name)
q.empty()
elif registry_name == "failed":
ids = FailedJobRegistry(queue_name).get_job_ids()
for id in ids:
delete_job_view(id)
elif registry_name == "deferred":
ids = DeferredJobRegistry(queue_name).get_job_ids()
for id in ids:
delete_job_view(id)
elif registry_name == "started":
ids = StartedJobRegistry(queue_name).get_job_ids()
for id in ids:
delete_job_view(id)
elif registry_name == "finished":
ids = FinishedJobRegistry(queue_name).get_job_ids()
for id in ids:
delete_job_view(id)
return dict(status="OK")