Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def healthcheck():
hostname = socket.gethostname()
with Connection(rq_redis_connection):
all_workers = Worker.all()
local_workers = [w for w in all_workers if w.hostname == hostname]
heartbeats = [w.last_heartbeat for w in local_workers]
time_since_seen = [datetime.datetime.utcnow() - hb for hb in heartbeats]
active = [t.seconds < 60 for t in time_since_seen]
sys.exit(int(not all(active)))
def rq_workers():
return [{
'name': w.name,
'hostname': w.hostname,
'pid': w.pid,
'queues': ", ".join([q.name for q in w.queues]),
'state': w.state,
'last_heartbeat': w.last_heartbeat,
'birth_date': w.birth_date,
'current_job': describe_job(w.get_current_job()),
'successful_jobs': w.successful_job_count,
'failed_jobs': w.failed_job_count,
'total_working_time': w.total_working_time
} for w in Worker.all(connection=rq_redis_connection)]
def ping_redis():
num_workers = len(Worker.all(conn))
return jsonify(error=not bool(num_workers), num_workers=num_workers)
def get_context_data(self, **kwargs):
ctx = super(Stats, self).get_context_data(**kwargs)
ctx.update({
'queues': Queue.all(connection=self.connection),
'workers': Worker.all(connection=self.connection),
'title': 'RQ Status',
})
return ctx
def check_worker_active_for_queue(self, queue):
# collect active workers
workers = rq.Worker.all(connection=redis.Redis.from_url(REDIS_URL))
# Retrieve active queues;
# make a flat list out of a list of lists
active_queue_names = sum([w.queue_names() for w in workers], [])
return queue.name in active_queue_names
def list_workers(instance_number):
def serialize_queue_names(worker):
return [q.name for q in worker.queues]
workers = sorted(
(
dict(
name=worker.name,
queues=serialize_queue_names(worker),
state=str(worker.get_state()),
current_job=serialize_current_job(worker.get_current_job()),
version=getattr(worker, "version", ""),
python_version=getattr(worker, "python_version", ""),
)
for worker in Worker.all()
),
key=lambda w: (w["state"], w["queues"], w["name"]),
)
return dict(workers=workers)
if not (queues and default and variants):
log.warning('RQ_QUEUES settings could not be found')
return
# Create connections to redis to identify the workers
def_connection = redis.Redis(host=default['HOST'],
port=default['PORT'],
db=default['DB'])
var_connection = redis.Redis(host=variants['HOST'],
port=variants['PORT'],
db=variants['DB'])
# Get all the workers connected with our redis server
try:
all_workers = Worker.all(def_connection) + \
Worker.all(var_connection)
except ConnectionError:
log.warning('Could not connect to redis server to create workers. '
'Please make sure Redis server is running')
return
found_default = False
found_variant = False
# Loop through all the workers (even duplicates)
for worker in all_workers:
found_default = found_default or 'default' in worker.queue_names()
found_variant = found_variant or 'variants' in worker.queue_names()
# Start the required worker
if not found_variant:
log.debug('Did not find variants worker. Starting ... ')