Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_fetch_dependencies_watches(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello)
dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job)
dependent_job.register_dependency()
dependent_job.save()
with self.testconn.pipeline() as pipeline:
dependent_job.fetch_dependencies(
watch=True,
pipeline=pipeline
)
pipeline.multi()
with self.assertRaises(WatchError):
self.testconn.set(dependency_job.id, 'somethingelsehappened')
def test_create_and_cancel_job(self):
"""test creating and using cancel_job deletes job properly"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.say_hello)
self.assertEqual(1, len(queue.get_jobs()))
cancel_job(job.id)
self.assertEqual(0, len(queue.get_jobs()))
def _add_jobs_to_queue(self, queue_name, num):
queue = Queue(queue_name, connection=Redis())
for _ in range(num):
queue.enqueue(self._dummy_func)
def test_enqueue_job_async_status_finished(self):
queue = Queue(is_async=False)
job = Job.create(func=fixtures.say_hello)
job = queue.enqueue_job(job)
self.assertEqual(job.result, 'Hi there, Stranger!')
self.assertEqual(job.get_status(), JobStatus.FINISHED)
def test_pin_installation(self):
patch()
import rq
assert Pin.get_from(rq) is not None
assert Pin.get_from(rq.job.Job) is not None
assert Pin.get_from(rq.Queue) is not None
assert Pin.get_from(rq.queue.Queue) is not None
assert Pin.get_from(rq.Worker) is not None
assert Pin.get_from(rq.worker.Worker) is not None
def test_fetch_dependencies_returns_dependency_jobs(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello)
dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job)
dependent_job.register_dependency()
dependent_job.save()
dependencies = dependent_job.fetch_dependencies(pipeline=self.testconn)
self.assertListEqual(dependencies, [dependency_job])
def test_get_queue(self):
"""registry.get_queue() returns the right Queue object."""
registry = StartedJobRegistry(connection=self.testconn)
self.assertEqual(registry.get_queue(), Queue(connection=self.testconn))
registry = StartedJobRegistry('foo', connection=self.testconn)
self.assertEqual(registry.get_queue(),
Queue('foo', connection=self.testconn))
def test_invalid_job(self):
"""Requeuing a job that's not in FailedJobRegistry raises an error."""
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
registry = FailedJobRegistry(connection=self.testconn)
with self.assertRaises(InvalidJobOperation):
registry.requeue(job)
def test_can_count_queues_properly(self):
try:
loaded_procs.clear()
for idx, queue_name in enumerate(['high', 'bottom']):
queue = Queue(queue_name, connection=Redis())
queue.empty()
# Put some jobs on the queue
self._add_jobs_to_queue('high', 2)
self._add_jobs_to_queue('bottom', 4)
# Now fake a job being active for one of them
for idx, queue_name in enumerate(['high', 'bottom']):
queue = Queue(queue_name, connection=Redis())
registry = StartedJobRegistry(queue_name, queue.connection)
# Passing in a negative score is important here, otherwise the job will be recognized as expired
registry.connection.zadd(registry.key, {'job_id_{}'.format(idx): -1})
# Load the HF procs
procs = load_procs(*(
'tests.contrib.django.testapp.rq_test_procs.WorkerProc',
'tests.contrib.django.testapp.rq_test_procs.AnotherWorkerProc'
))
# Total should be all queued + 1 active for each
assert sum([proc.quantity() for proc_name, proc in procs.items()]) == 8
finally:
loaded_procs.clear()
for idx, queue_name in enumerate(['high', 'bottom']):
queue = Queue(queue_name, connection=Redis())
def enqueue_scheduled_jobs(self):
"""Enqueue jobs whose timestamp is in the past"""
self._status = self.Status.WORKING
for registry in self._scheduled_job_registries:
timestamp = current_timestamp()
# TODO: try to use Lua script to make get_jobs_to_schedule()
# and remove_jobs() atomic
job_ids = registry.get_jobs_to_schedule(timestamp)
if not job_ids:
continue
queue = Queue(registry.name, connection=self.connection)
with self.connection.pipeline() as pipeline:
# This should be done in bulk
for job_id in job_ids:
job = Job.fetch(job_id, connection=self.connection)
queue.enqueue_job(job, pipeline=pipeline)
registry.remove_jobs(timestamp)
pipeline.execute()
self._status = self.Status.STARTED