Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_default_timeout(self):
"""Timeout can be passed via job_timeout argument"""
queue = Queue()
job = queue.enqueue(echo, 1)
self.assertEqual(job.timeout, queue.DEFAULT_TIMEOUT)
job = Job.create(func=echo)
job = queue.enqueue_job(job)
self.assertEqual(job.timeout, queue.DEFAULT_TIMEOUT)
queue = Queue(default_timeout=15)
job = queue.enqueue(echo, 1)
self.assertEqual(job.timeout, 15)
job = Job.create(func=echo)
job = queue.enqueue_job(job)
self.assertEqual(job.timeout, 15)
def test_get_job_ttl(self):
"""Getting job TTL."""
ttl = 1
job = Job.create(func=fixtures.say_hello, ttl=ttl)
job.save()
self.assertEqual(job.get_ttl(), ttl)
job = Job.create(func=fixtures.say_hello)
job.save()
self.assertEqual(job.get_ttl(), None)
self.assertRaises(ValueError, registry.schedule, job, datetime(2019, 1, 1))
registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc))
self.assertEqual(self.testconn.zscore(registry.key, job.id),
1546300800) # 2019-01-01 UTC in Unix timestamp
else:
from datetime import timezone
# If we pass in a datetime with no timezone, `schedule()`
# assumes local timezone so depending on your local timezone,
# the timestamp maybe different
registry.schedule(job, datetime(2019, 1, 1))
self.assertEqual(self.testconn.zscore(registry.key, job.id),
1546300800 + time.timezone) # 2019-01-01 UTC in Unix timestamp
# Score is always stored in UTC even if datetime is in a different tz
tz = timezone(timedelta(hours=7))
job = Job.create('myfunc', connection=self.testconn)
job.save()
registry.schedule(job, datetime(2019, 1, 1, 7, tzinfo=tz))
self.assertEqual(self.testconn.zscore(registry.key, job.id),
1546300800) # 2019-01-01 UTC in Unix timestamp
def test_job_is_unimportable(self):
"""Jobs that cannot be imported throw exception on access."""
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.save()
# Now slightly modify the job to make it unimportable (this is
# equivalent to a worker not having the most up-to-date source code
# and unable to import the function)
job_data = job.data
unimportable_data = job_data.replace(b'say_hello', b'nay_hello')
self.testconn.hset(job.key, 'data', zlib.compress(unimportable_data))
job.refresh()
with self.assertRaises(AttributeError):
job.func # accessing the func property should fail
def test_result_ttl_is_persisted(self):
"""Ensure that job's result_ttl is set properly"""
job = Job.create(func=fixtures.say_hello, args=('Lionel',), result_ttl=10)
job.save()
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, 10)
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.save()
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, None)
def test_enqueue_scheduled_jobs(self):
"""Scheduler can enqueue scheduled jobs"""
queue = Queue(connection=self.testconn)
registry = ScheduledJobRegistry(queue=queue)
job = Job.create('myfunc', connection=self.testconn)
job.save()
registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc))
scheduler = RQScheduler([queue], connection=self.testconn)
scheduler.acquire_locks()
scheduler.enqueue_scheduled_jobs()
self.assertEqual(len(queue), 1)
# After job is scheduled, registry should be empty
self.assertEqual(len(registry), 0)
# Jobs scheduled in the far future should not be affected
registry.schedule(job, datetime(2100, 1, 1, tzinfo=utc))
scheduler.enqueue_scheduled_jobs()
self.assertEqual(len(queue), 1)
def test_job_with_dependents_delete_parent(self):
"""job.delete() deletes itself from Redis but not dependents.
Wthout a save, the dependent job is never saved into redis. The delete
method will get and pass a NoSuchJobError.
"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.say_hello)
job2 = Job.create(func=fixtures.say_hello, depends_on=job)
job2.register_dependency()
job.delete()
self.assertFalse(self.testconn.exists(job.key))
self.assertFalse(self.testconn.exists(job.dependents_key))
# By default, dependents are not deleted, but The job is in redis only
# if it was saved!
self.assertFalse(self.testconn.exists(job2.key))
self.assertNotIn(job.id, queue.get_job_ids())
def test_get_result_ttl(self):
"""Getting job result TTL."""
job_result_ttl = 1
default_ttl = 2
job = Job.create(func=fixtures.say_hello, result_ttl=job_result_ttl)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl)
self.assertEqual(job.get_result_ttl(), job_result_ttl)
job = Job.create(func=fixtures.say_hello)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl)
self.assertEqual(job.get_result_ttl(), None)
def test_fetch_dependencies_raises_if_dependency_deleted(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello)
dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job)
dependent_job.register_dependency()
dependent_job.save()
dependency_job.delete()
with self.assertRaises(NoSuchJobError):
dependent_job.fetch_dependencies(pipeline=self.testconn)
def test_save(self): # noqa
"""Storing jobs."""
job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
# Saving creates a Redis hash
self.assertEqual(self.testconn.exists(job.key), False)
job.save()
self.assertEqual(self.testconn.type(job.key), b'hash')
# Saving writes pickled job data
unpickled_data = loads(zlib.decompress(self.testconn.hget(job.key, 'data')))
self.assertEqual(unpickled_data[0], 'tests.fixtures.some_calculation')