Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from rq.compat import as_text
from rq.defaults import DEFAULT_FAILURE_TTL
from rq.exceptions import InvalidJobOperation
from rq.job import Job, JobStatus, requeue_job
from rq.queue import Queue
from rq.utils import current_timestamp
from rq.worker import Worker
from rq.registry import (clean_registries, DeferredJobRegistry,
FailedJobRegistry, FinishedJobRegistry,
StartedJobRegistry)
from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
class CustomJob(Job):
"""A custom job class just to test it"""
class TestRegistry(RQTestCase):
def setUp(self):
super(TestRegistry, self).setUp()
self.registry = StartedJobRegistry(connection=self.testconn)
def test_init(self):
"""Registry can be instantiated with queue or name/Redis connection"""
queue = Queue('foo', connection=self.testconn)
registry = StartedJobRegistry(queue=queue)
self.assertEqual(registry.name, queue.name)
self.assertEqual(registry.connection, queue.connection)
def test_crontab_sets_timeout(self):
"""
Ensure that a job scheduled via crontab can be created with
a custom timeout.
"""
timeout = 13
job = self.scheduler.cron("1 * * * *", say_hello, timeout=timeout)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job_from_queue.timeout, timeout)
def append_scan_summary_info(self, ip_addresses):
if not ip_addresses:
return
delta = timezone.now() - datetime.timedelta(days=1)
for ip_address in ip_addresses:
if (
ip_address.scan_summary and
ip_address.scan_summary.modified > delta
):
try:
job = rq.job.Job.fetch(
ip_address.scan_summary.job_id,
django_rq.get_connection(),
)
except rq.exceptions.NoSuchJobError:
continue
else:
ip_address.scan_summary.changed = job.meta.get(
'changed',
False,
)
def run(request, run_id):
"""Details of single run page"""
run = ModelRun.objects.get(pk=run_id)
redis_conn = django_rq.get_connection()
job = Job.fetch(run.job_id, redis_conn)
run.job_metadata = job.meta
metrics = run.metrics.order_by('name').values('name').distinct()
return render(request,
'main/run_detail.html',
{'run': run, 'metrics': metrics})
def get_rq_job(self):
try:
rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis)
except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError):
return None
return rq_job
if request.POST.get('_selected_action', False):
context_data = {
'queue_index': queue_index,
'action': request.POST['action'],
'job_ids': request.POST.getlist('_selected_action'),
'queue': queue,
}
return render(request, 'django_rq/confirm_action.html', context_data)
# do confirmed action
elif request.POST.get('job_ids', False):
job_ids = request.POST.getlist('job_ids')
if request.POST['action'] == 'delete':
for job_id in job_ids:
job = Job.fetch(job_id, connection=queue.connection)
# Remove job id from queue and delete the actual job
queue.connection.lrem(queue.key, 0, job.id)
job.delete()
messages.info(request, 'You have successfully deleted %s jobs!' % len(job_ids))
elif request.POST['action'] == 'requeue':
for job_id in job_ids:
requeue_job(job_id, connection=queue.connection)
messages.info(request, 'You have successfully requeued %d jobs!' % len(job_ids))
return redirect('rq_jobs', queue_index)
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
ret = job.return_value
elif job.is_queued:
ret = {'status':'in-queue'}
elif job.is_started:
ret = {'status':'waiting'}
elif job.is_failed:
ret = {'status': 'failed'}
return json.dumps(ret), 200
for registry in self._scheduled_job_registries:
timestamp = current_timestamp()
# TODO: try to use Lua script to make get_jobs_to_schedule()
# and remove_jobs() atomic
job_ids = registry.get_jobs_to_schedule(timestamp)
if not job_ids:
continue
queue = Queue(registry.name, connection=self.connection)
with self.connection.pipeline() as pipeline:
# This should be done in bulk
for job_id in job_ids:
job = Job.fetch(job_id, connection=self.connection)
queue.enqueue_job(job, pipeline=pipeline)
registry.remove_jobs(timestamp)
pipeline.execute()
self._status = self.Status.STARTED
def get_rq_job(self):
try:
rq_job = Job.fetch(self.job_id, rq.connection)
except (RedisError, NoSuchJobError):
return None
return rq_job
current_queue = request.GET.get('queue')
queue = django_rq.get_queue(current_queue)
registry = FinishedJobRegistry(queue.name, queue.connection)
items_per_page = 10
num_jobs = len(registry)
jobs = []
if num_jobs > 0:
offset = 0
job_ids = registry.get_job_ids(offset, items_per_page)
for job_id in job_ids:
try:
jobs.append(Job.fetch(job_id, connection=queue.connection))
except NoSuchJobError:
pass
jobdata = list()
for job in jobs:
job_dict = {
'job_id': job.id,
'func_name': job.func_name,
'ended_at': job.ended_at.strftime("%a, %d %b %Y %H:%M:%S +0000"),
'enqueued_at': job.enqueued_at.strftime(
"%a, %d %b %Y %H:%M:%S +0000"
),
'args': [str(arg) for arg in job.args]}
jobdata.append(job_dict)