Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_run_scheduled_access_self(self):
"""Schedule a job that schedules a job, then run the worker as subprocess"""
q = Queue()
job = q.enqueue(schedule_access_self)
subprocess.check_call(['rqworker', '-u', self.redis_url, '-b'])
registry = FinishedJobRegistry(queue=q)
self.assertTrue(job in registry)
assert q.count == 0
def test_clean_registries(self):
"""clean_registries() cleans Started and Finished job registries."""
queue = Queue(connection=self.testconn)
finished_job_registry = FinishedJobRegistry(connection=self.testconn)
self.testconn.zadd(finished_job_registry.key, {'foo': 1})
started_job_registry = StartedJobRegistry(connection=self.testconn)
self.testconn.zadd(started_job_registry.key, {'foo': 1})
failed_job_registry = FailedJobRegistry(connection=self.testconn)
self.testconn.zadd(failed_job_registry.key, {'foo': 1})
clean_registries(queue)
self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0)
self.assertEqual(self.testconn.zcard(started_job_registry.key), 0)
self.assertEqual(self.testconn.zcard(failed_job_registry.key), 0)
# enqueue_dependents calls multi() on the pipeline!
queue.enqueue_dependents(job, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline)
self.increment_successful_job_count(pipeline=pipeline)
self.increment_total_working_time(
job.ended_at - job.started_at, pipeline
)
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
# Don't clobber the user's meta dictionary!
job.save(pipeline=pipeline, include_meta=False)
finished_job_registry = FinishedJobRegistry(job.origin,
self.connection,
job_class=self.job_class)
finished_job_registry.add(job, result_ttl, pipeline)
job.cleanup(result_ttl, pipeline=pipeline,
remove_from_queue=False)
started_job_registry.remove(job, pipeline=pipeline)
pipeline.execute()
break
except WatchError:
continue
the order specified with a JSON batch add
jobord_dict: dict
Dictionary containing batch job add details as:
{job_id: place}
job_id: str hex representation of uuid job ID
place: int indicating place in queue
Returns
------
"""
marsh_schema = parse_json_schema().load(request.json)
if len(marsh_schema.errors) > 0:
logger.debug('Validation error: {}'.format(marsh_schema.errors))
return marsh_schema.errors, 500
comp = rq.registry.FinishedJobRegistry('default',
connection=self.redis_con)
###***change this to match reports, validate job_id correctly
if job_id == "reorder":
logger.debug('Reorder queue command received')
logger.debug(marsh_schema.data['batch_job'])
try:
adder = Adder()
for job in marsh_schema.data['batch_job']:
job_id = job['job_id']
if adder.session_check(self.log_dir, job_id):
logger.debug('Valid session found')
started = rq.registry.StartedJobRegistry('default',
connection=self.redis_con)
cur_list = started.get_job_ids()
if job_id in cur_list:
logger.error('Job is already running')
def _repr(q):
return "running:%d pending:%d finished:%d" % (
StartedJobRegistry(q.name, conn).count,
q.count,
FinishedJobRegistry(q.name, conn).count)
for q in Queue.all(conn):
def delete(self, pipeline=None, remove_from_queue=True,
delete_dependents=False):
"""Cancels the job and deletes the job hash from Redis. Jobs depending
on this job can optionally be deleted as well."""
if remove_from_queue:
self.cancel(pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection
if self.is_finished:
from .registry import FinishedJobRegistry
registry = FinishedJobRegistry(self.origin,
connection=self.connection,
job_class=self.__class__)
registry.remove(self, pipeline=pipeline)
elif self.is_deferred:
from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.origin,
connection=self.connection,
job_class=self.__class__)
registry.remove(self, pipeline=pipeline)
elif self.is_started:
from .registry import StartedJobRegistry
registry = StartedJobRegistry(self.origin,
connection=self.connection,
job_class=self.__class__)
def check_complete(self):
"""
This method checks the completed queue and print info to a log file
Parameters
---------
Returns
-------
comp_list: rq.registry
Finished job registry
"""
comp_list = rq.registry.FinishedJobRegistry('default',
connection=self.redis_con).get_job_ids()
return comp_list
def get_queue_registry_jobs_count(queue_name, registry_name, offset, per_page):
queue = Queue(queue_name)
if registry_name != "queued":
if per_page >= 0:
per_page = offset + (per_page - 1)
if registry_name == "failed":
current_queue = FailedJobRegistry(queue_name)
elif registry_name == "deferred":
current_queue = DeferredJobRegistry(queue_name)
elif registry_name == "started":
current_queue = StartedJobRegistry(queue_name)
elif registry_name == "finished":
current_queue = FinishedJobRegistry(queue_name)
else:
current_queue = queue
total_items = current_queue.count
job_ids = current_queue.get_job_ids(offset, per_page)
current_queue_jobs = [queue.fetch_job(job_id) for job_id in job_ids]
jobs = [serialize_job(job) for job in current_queue_jobs]
return (total_items, jobs)