Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_job_execution(self):
"""Job is removed from StartedJobRegistry after execution."""
registry = StartedJobRegistry(connection=self.testconn)
queue = Queue(connection=self.testconn)
worker = Worker([queue])
job = queue.enqueue(say_hello)
self.assertTrue(job.is_queued)
worker.prepare_job_execution(job)
self.assertIn(job.id, registry.get_job_ids())
self.assertTrue(job.is_started)
worker.perform_job(job, queue)
self.assertNotIn(job.id, registry.get_job_ids())
self.assertTrue(job.is_finished)
# Job that fails
job = queue.enqueue(div_by_zero)
worker.prepare_job_execution(job)
def test_info_only_workers(self):
"""rq info -u --only-workers (-W)"""
runner = CliRunner()
result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-workers'])
self.assert_normal_execution(result)
self.assertIn('0 workers, 0 queue', result.output)
result = runner.invoke(main, ['info', '--by-queue',
'-u', self.redis_url, '--only-workers'])
self.assert_normal_execution(result)
self.assertIn('0 workers, 0 queue', result.output)
worker = Worker(['default'], connection=self.connection)
worker.register_birth()
result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-workers'])
self.assert_normal_execution(result)
self.assertIn('1 workers, 0 queues', result.output)
worker.register_death()
queue = Queue(connection=self.connection)
queue.enqueue(say_hello)
result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-workers'])
self.assert_normal_execution(result)
self.assertIn('0 workers, 1 queues', result.output)
foo_queue = Queue(name='foo', connection=self.connection)
foo_queue.enqueue(say_hello)
bar_queue = Queue(name='bar', connection=self.connection)
def test_run_maintenance_tasks(self, mocked):
"""scheduler.acquire_locks() is called only when scheduled is enabled"""
queue = Queue(connection=self.testconn)
worker = Worker(queues=[queue], connection=self.testconn)
worker.run_maintenance_tasks()
self.assertEqual(mocked.call_count, 0)
worker.last_cleaned_at = None
worker.scheduler = RQScheduler([queue], connection=self.testconn)
worker.run_maintenance_tasks()
self.assertEqual(mocked.call_count, 0)
worker.last_cleaned_at = datetime.now()
worker.run_maintenance_tasks()
self.assertEqual(mocked.call_count, 1)
def test_work_burst(self):
"""worker.work() with scheduler enabled works properly"""
queue = Queue(connection=self.testconn)
worker = Worker(queues=[queue], connection=self.testconn)
worker.work(burst=True, with_scheduler=False)
self.assertIsNone(worker.scheduler)
worker = Worker(queues=[queue], connection=self.testconn)
worker.work(burst=True, with_scheduler=True)
self.assertIsNotNone(worker.scheduler)
def setup_once():
# type: () -> None
try:
version = tuple(map(int, RQ_VERSION.split(".")[:3]))
except (ValueError, TypeError):
raise DidNotEnable("Unparseable RQ version: {}".format(RQ_VERSION))
if version < (0, 6):
raise DidNotEnable("RQ 0.6 or newer is required.")
old_perform_job = Worker.perform_job
def sentry_patched_perform_job(self, job, *args, **kwargs):
# type: (Any, Job, *Queue, **Any) -> bool
hub = Hub.current
integration = hub.get_integration(RqIntegration)
if integration is None:
return old_perform_job(self, job, *args, **kwargs)
client = hub.client
assert client is not None
with hub.push_scope() as scope:
scope.clear_breadcrumbs()
scope.add_event_processor(_make_event_processor(weakref.ref(job)))
# Patch rq.queue.Queue
Pin(
service=config.rq['service_name'],
app=config.rq['app'],
app_type=config.rq['app_type'],
).onto(rq.queue.Queue)
_w('rq.queue', 'Queue.enqueue_job', traced_queue_enqueue_job(rq.queue.Queue))
_w('rq.queue', 'Queue.fetch_job', traced_queue_fetch_job(rq.queue.Queue))
# Patch rq.worker.Worker
Pin(
service=config.rq['worker_service_name'],
app=config.rq['app'],
app_type=config.rq['app_type'],
).onto(rq.worker.Worker)
_w(rq.worker, 'Worker.perform_job', traced_perform_job(rq.worker.Worker))
setattr(rq, '_datadog_patch', True)
with capture_internal_exceptions():
transaction.name = job.func_name
with hub.start_transaction(transaction):
rv = old_perform_job(self, job, *args, **kwargs)
if self.is_horse:
# We're inside of a forked process and RQ is
# about to call `os._exit`. Make sure that our
# events get sent out.
client.flush()
return rv
Worker.perform_job = sentry_patched_perform_job
old_handle_exception = Worker.handle_exception
def sentry_patched_handle_exception(self, job, *exc_info, **kwargs):
# type: (Worker, Any, *Any, **Any) -> Any
_capture_exception(exc_info) # type: ignore
return old_handle_exception(self, job, *exc_info, **kwargs)
Worker.handle_exception = sentry_patched_handle_exception
old_enqueue_job = Queue.enqueue_job
def sentry_patched_enqueue_job(self, job, **kwargs):
# type: (Queue, Any, **Any) -> Any
hub = Hub.current
if hub.get_integration(RqIntegration) is not None:
return
Pin.remove_from(rq)
# Unpatch rq.job.Job
Pin.remove_from(rq.job.Job)
_uw(rq.job.Job, 'perform')
_uw(rq.job.Job, 'fetch')
# Unpatch rq.queue.Queue
Pin.remove_from(rq.queue.Queue)
_uw(rq.queue.Queue, 'enqueue_job')
_uw(rq.queue.Queue, 'fetch_job')
# Unpatch rq.worker.Worker
Pin.remove_from(rq.worker.Worker)
_uw(rq.worker.Worker, 'perform_job')
setattr(rq, '_datadog_patch', False)
import os
from datetime import datetime, timedelta
from rq.queue import Queue
from rq.worker import Worker
from .queue import DeadLetterQueue
try:
import rq_scheduler
except ImportError:
rq_scheduler = None
class RetryWorker(Worker):
"""Worker class that periodically retries jobs on the FailedQueue.
All Workers check for maintenance tasks after running each job. The
RetryWorker retries jobs on the failed queue as part of its maintenance
tasks. The RetryWorker also has a configurable interval for how often
maintenance is performed.
All parameters supported by Worker are supported by RetryWorker. In
addition the parameters below, which must be passed as keyword
arguments are accepted.
Each parameter below can also be set by an environment variable
with its name uppercased. Examples below.
Settings
--------
def get_current_worker():
"""
Get the rq worker assigned to the current job
Returns:
class:`rq.worker.Worker`: The worker assigned to the current job
"""
for worker in Worker.all():
if worker.get_current_job() == get_current_job():
return worker
return None