Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_job_times(self):
"""job times are set correctly."""
q = Queue('foo')
w = Worker([q])
before = utcnow()
before = before.replace(microsecond=0)
job = q.enqueue(say_hello)
self.assertIsNotNone(job.enqueued_at)
self.assertIsNone(job.started_at)
self.assertIsNone(job.ended_at)
self.assertEqual(
w.work(burst=True), True,
'Expected at least some work done.'
)
self.assertEqual(job.result, 'Hi there, Stranger!')
after = utcnow()
job.refresh()
self.assertTrue(
before <= job.enqueued_at <= after,
'Not %s <= %s <= %s' % (before, job.enqueued_at, after)
)
self.assertTrue(
before <= job.started_at <= after,
'Not %s <= %s <= %s' % (before, job.started_at, after)
)
self.assertTrue(
before <= job.ended_at <= after,
'Not %s <= %s <= %s' % (before, job.ended_at, after)
)
def test_clean_rq(self):
r = get_redis_connection()
self.assertEqual(len(r.keys('rq:job:*')), 0)
r.hmset('rq:job:abc', {'bar': 'baz'})
r.hmset('rq:job:def', {'created_at': utcformat(utcnow())})
r.hmset('rq:job:123', {
'created_at': utcformat(utcnow() - timedelta(days=10))})
self.assertEqual(len(r.keys('rq:job:*')), 3)
call_command('clean_rq')
self.assertEqual(len(r.keys('rq:job:*')), 2)
def test_should_run_maintenance_tasks(self):
"""Workers should run maintenance tasks on startup and every hour."""
queue = Queue(connection=self.testconn)
worker = Worker(queue)
self.assertTrue(worker.should_run_maintenance_tasks)
worker.last_cleaned_at = utcnow()
self.assertFalse(worker.should_run_maintenance_tasks)
worker.last_cleaned_at = utcnow() - timedelta(seconds=3700)
self.assertTrue(worker.should_run_maintenance_tasks)
def register_birth(self):
"""Registers its own birth."""
self.log.debug('Registering birth of worker %s', self.name)
if self.connection.exists(self.key) and \
not self.connection.hexists(self.key, 'death'):
msg = 'There exists an active worker named {0!r} already'
raise ValueError(msg.format(self.name))
key = self.key
queues = ','.join(self.queue_names())
with self.connection.pipeline() as p:
p.delete(key)
now = utcnow()
now_in_string = utcformat(now)
self.birth_date = now
p.hmset(key, {
'birth': now_in_string,
'last_heartbeat': now_in_string,
'queues': queues,
'pid': self.pid,
'hostname': self.hostname,
'version': self.version,
'python_version': self.python_version,
})
worker_registration.register(self, p)
p.expire(key, self.default_worker_ttl)
p.execute()
# any other case, this is some other unexpected OS error,
# which we don't want to catch, so we re-raise those ones.
if e.errno != errno.EINTR:
raise
# Send a heartbeat to keep the worker alive.
self.heartbeat()
if ret_val == os.EX_OK: # The process exited normally.
return
job_status = job.get_status()
if job_status is None: # Job completed and its ttl has expired
return
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
if not job.ended_at:
job.ended_at = utcnow()
# Unhandled failure: move the job to the failed queue
self.log.warning((
'Moving job to FailedJobRegistry '
'(work-horse terminated unexpectedly; waitpid returned {})'
).format(ret_val))
self.handle_job_failure(
job,
exc_string="Work-horse process was terminated unexpectedly "
"(waitpid returned %s)" % ret_val
)
timeout = -1
else:
timeout = job.timeout or 180
if heartbeat_ttl is None:
heartbeat_ttl = self.job_monitoring_interval + 5
with self.connection.pipeline() as pipeline:
self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat(heartbeat_ttl, pipeline=pipeline)
registry = StartedJobRegistry(job.origin, self.connection,
job_class=self.job_class)
registry.add(job, timeout, pipeline=pipeline)
job.set_status(JobStatus.STARTED, pipeline=pipeline)
pipeline.hset(job.key, 'started_at', utcformat(utcnow()))
pipeline.execute()
msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time()))
def set_shutdown_requested_date(self):
"""Sets the date on which the worker received a (warm) shutdown request"""
self.connection.hset(self.key, 'shutdown_requested_date', utcformat(utcnow()))
def perform_job(self, job, queue, heartbeat_ttl=None):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
self.prepare_job_execution(job, heartbeat_ttl)
push_connection(self.connection)
started_job_registry = StartedJobRegistry(job.origin,
self.connection,
job_class=self.job_class)
try:
job.started_at = utcnow()
timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id):
rv = job.perform()
job.ended_at = utcnow()
# Pickle the result in the same try-except block since we need
# to use the same exc handling when pickling fails
job._result = rv
self.handle_job_success(job=job,
queue=queue,
started_job_registry=started_job_registry)
except:
job.ended_at = utcnow()
exc_info = sys.exc_info()
exc_string = self._get_safe_exception_string(
def register_death(self):
"""Registers its own death."""
self.log.debug('Registering death')
with self.connection.pipeline() as p:
# We cannot use self.state = 'dead' here, because that would
# rollback the pipeline
worker_registration.unregister(self, p)
p.hset(self.key, 'death', utcformat(utcnow()))
p.expire(self.key, 60)
p.execute()