Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_enqueue_job_with_job_queue_name(self):
"""
Ensure that job is enqueued correctly when queue_name is provided
at job creation
"""
queue = Queue('foo', connection=self.testconn)
job_queue = Queue('job_foo', connection=self.testconn)
scheduler = Scheduler(connection=self.testconn, queue=queue)
job = scheduler._create_job(say_hello, queue_name='job_foo')
self.assertEqual(scheduler.get_queue_for_job(job), job_queue)
scheduler.enqueue_job(job)
self.assertTrue(job.enqueued_at is not None)
self.assertIn(job, job_queue.jobs)
self.assertIn(job_queue, Queue.all())
def test_enqueue_job_with_scheduler_queue(self):
"""
Ensure that job is enqueued correctly when the scheduler is bound
to a queue object and job queue name is not provided.
"""
queue = Queue('foo', connection=self.testconn)
scheduler = Scheduler(connection=self.testconn, queue=queue)
job = scheduler._create_job(say_hello)
scheduler_queue = scheduler.get_queue_for_job(job)
self.assertEqual(queue, scheduler_queue)
scheduler.enqueue_job(job)
self.assertTrue(job.enqueued_at is not None)
self.assertIn(job, queue.jobs)
self.assertIn(queue, Queue.all())
def list_queues():
queues = serialize_queues(sorted(Queue.all()))
return dict(queues=queues)
def rq_queues():
return {
q.name: {
'name': q.name,
'started': fetch_jobs(q, StartedJobRegistry(queue=q).get_job_ids()),
'queued': len(q.job_ids)
} for q in Queue.all(connection=rq_redis_connection)}
def purge_failed_jobs():
with Connection(rq_redis_connection):
for queue in Queue.all():
failed_job_ids = FailedJobRegistry(queue=queue).get_job_ids()
failed_jobs = Job.fetch_many(failed_job_ids, rq_redis_connection)
stale_jobs = [job for job in failed_jobs if (datetime.utcnow() - job.ended_at).seconds > settings.JOB_DEFAULT_FAILURE_TTL]
for job in stale_jobs:
job.delete()
if stale_jobs:
logger.info('Purged %d old failed jobs from the %s queue.', len(stale_jobs), queue.name)
def get_context_data(self, **kwargs):
ctx = super(Stats, self).get_context_data(**kwargs)
ctx.update({
'queues': Queue.all(connection=self.connection),
'workers': Worker.all(connection=self.connection),
'title': 'RQ Status',
})
return ctx
def get_info(show_failed=False):
conn = get_redis_conn()
queues = Queue.all(conn)
workers = Worker.all(conn)
jobs = []
def add_job(j, name):
if j.kwargs.get('site')==frappe.local.site:
jobs.append({
'job_name': j.kwargs.get('kwargs', {}).get('playbook_method') \
or str(j.kwargs.get('job_name')),
'status': j.status, 'queue': name,
'creation': format_datetime(convert_utc_to_user_timezone(j.created_at)),
'color': colors[j.status]
})
if j.exc_info:
jobs[-1]['exc_info'] = j.exc_info
for w in workers:
def get_queues_status():
return {**{queue: {'size': redis_connection.llen(queue)} for queue in get_celery_queues()},
**{queue.name: {'size': len(queue)} for queue in Queue.all(connection=rq_redis_connection)}}
def queues_overview(instance_number):
r = make_response(
render_template(
"rq_dashboard/queues.html",
current_instance=instance_number,
instance_list=current_app.config.get("RQ_DASHBOARD_REDIS_URL"),
queues=Queue.all(),
rq_url_prefix=url_for(".queues_overview"),
rq_dashboard_version=rq_dashboard_version,
rq_version=rq_version,
active_tab="queues",
deprecation_options_usage=current_app.config.get(
"DEPRECATED_OPTIONS", False
),
)
)
r.headers.set("Cache-Control", "no-store")
return r