Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_find_by_key(self):
"""Worker.find_by_key restores queues, state and job_id."""
queues = [Queue('foo'), Queue('bar')]
w = Worker(queues)
w.register_death()
w.register_birth()
w.set_state(WorkerStatus.STARTED)
worker = Worker.find_by_key(w.key)
self.assertEqual(worker.queues, queues)
self.assertEqual(worker.get_state(), WorkerStatus.STARTED)
self.assertEqual(worker._job_id, None)
self.assertTrue(worker.key in Worker.all_keys(worker.connection))
self.assertEqual(worker.version, VERSION)
# If worker is gone, its keys should also be removed
worker.connection.delete(worker.key)
Worker.find_by_key(worker.key)
self.assertFalse(worker.key in Worker.all_keys(worker.connection))
self.assertRaises(ValueError, Worker.find_by_key, 'foo')
# With iterable of byte strings
w = Worker(iter([b'foo', b'bar']))
self.assertEqual(w.queues[0].name, 'foo')
self.assertEqual(w.queues[1].name, 'bar')
# With single Queue
w = Worker(Queue('foo'))
self.assertEqual(w.queues[0].name, 'foo')
# With iterable of Queues
w = Worker(iter([Queue('foo'), Queue('bar')]))
self.assertEqual(w.queues[0].name, 'foo')
self.assertEqual(w.queues[1].name, 'bar')
# With list of Queues
w = Worker([Queue('foo'), Queue('bar')])
self.assertEqual(w.queues[0].name, 'foo')
self.assertEqual(w.queues[1].name, 'bar')
def test_working_worker_warm_shutdown(self):
"""worker with an ongoing job receiving single SIGTERM signal, allowing job to finish then shutting down"""
fooq = Queue('foo')
w = Worker(fooq)
sentinel_file = '/tmp/.rq_sentinel_warm'
fooq.enqueue(create_file_after_timeout, sentinel_file, 2)
self.assertFalse(w._stop_requested)
p = Process(target=kill_worker, args=(os.getpid(), False))
p.start()
w.work()
p.join(2)
self.assertFalse(p.is_alive())
self.assertTrue(w._stop_requested)
self.assertTrue(os.path.exists(sentinel_file))
self.assertIsNotNone(w.shutdown_requested_date)
self.assertEqual(type(w.shutdown_requested_date).__name__, 'datetime')
def test_self_modification_persistence(self):
"""Make sure that any meta modification done by
the job itself persists completely through the
queue/worker/job stack."""
q = Queue()
# Also make sure that previously existing metadata
# persists properly
job = q.enqueue(modify_self, meta={'foo': 'bar', 'baz': 42},
args=[{'baz': 10, 'newinfo': 'waka'}])
w = Worker([q])
w.work(burst=True)
job_check = Job.fetch(job.id)
self.assertEqual(job_check.meta['foo'], 'bar')
self.assertEqual(job_check.meta['baz'], 10)
self.assertEqual(job_check.meta['newinfo'], 'waka')
# Relative imports
sys.path.append('./queue')
from queue_worker import get_connection
redis = Redis(
os.environ.get('REDIS_PORT_6379_TCP_ADDR', '172.17.0.2'),
os.environ.get('REDIS_1_PORT_6379_TCP_PORT', '6379'),
db=os.environ.get('WORKER_DB', 0)
)
listen = os.environ.get('WORKER_QUEUES', 'jobs').split(',')
if __name__ == '__main__':
with Connection(redis):
worker = Worker(map(Queue, listen))
worker.work()
#!/usr/bin/env python
import sys
from rq import Connection, Worker
import split.data
# Provide queue names to listen to as arguments to this script,
# similar to rqworker
with Connection():
qs = sys.argv[1:] or ['default']
w = Worker(qs)
w.work()
import os
import redis
from rq import Worker, Queue, Connection
listen = ['high', 'default', 'low']
redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
def main():
utils.setup_logging()
config.log()
gh_pr.monkeypatch_github()
if config.FLUSH_REDIS_ON_STARTUP:
utils.get_redis().flushall()
with rq.Connection(utils.get_redis()):
worker = rq.Worker(['default'])
worker.work()
from builtins import map
import os
import redis
from rq import Worker, Queue, Connection
listen = ['high', 'default', 'low']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()