Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
runJob.save_meta()
runJobs.append(runJob)
jobToTest[runJob] = test
print("queued " + str(len(runJobs)) + " jobs")
stop = False
while runJobs and not stop:
try:
for j in runJobs[:]:
if j.status is None:
print("Test " + j.meta["title"] + " was removed from the queue")
runJobs.remove(j)
elif j.status == rq.job.JobStatus.FAILED:
j.refresh()
print("Test " + j.meta["title"] + " failed: " + str(j.exc_info))
runJobs.remove(j)
else:
if j.result is not None:
tst = jobToTest[j]
result.startTest(tst)
try:
# process the result
# note: fingerprint mismatch is technically NOT an error in 4.2 or before! (exitcode==0)
#self.storeExitcodeCallback(result.exitcode)
if j.result.exitcode != 0:
raise Exception("runtime error:" + j.result.errormsg)
elif j.result.cpuTimeLimitReached:
raise Exception("cpu time limit exceeded")
queue = Queue(connection=self.testconn)
failed_job_registry = FailedJobRegistry(connection=self.testconn)
job = queue.enqueue(say_hello)
self.testconn.zadd(self.registry.key, {job.id: 2})
# Job has not been moved to FailedJobRegistry
self.registry.cleanup(1)
self.assertNotIn(job, failed_job_registry)
self.assertIn(job, self.registry)
self.registry.cleanup()
self.assertIn(job.id, failed_job_registry)
self.assertNotIn(job, self.registry)
job.refresh()
self.assertEqual(job.get_status(), JobStatus.FAILED)
def test_job_access_within_job_function(self):
"""The current job is accessible within the job function."""
q = Queue()
job = q.enqueue(fixtures.access_self)
w = Worker([q])
w.work(burst=True)
# access_self calls get_current_job() and executes successfully
self.assertEqual(job.get_status(), JobStatus.FINISHED)
def run_job(self, job):
job.perform()
job.set_status(JobStatus.FINISHED)
job.save(include_meta=False)
job.cleanup(DEFAULT_RESULT_TTL)
return job
if timeout is None:
timeout = self._default_timeout
elif timeout == 0:
raise ValueError('0 timeout is not allowed. Use -1 for infinite timeout')
result_ttl = parse_timeout(result_ttl)
failure_ttl = parse_timeout(failure_ttl)
ttl = parse_timeout(ttl)
if ttl is not None and ttl <= 0:
raise ValueError('Job ttl must be greater than 0')
job = self.job_class.create(
func, args=args, kwargs=kwargs, connection=self.connection,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
status=JobStatus.QUEUED, description=description,
depends_on=depends_on, timeout=timeout, id=job_id,
origin=self.name, meta=meta)
# If a _dependent_ job depends on any unfinished job, register all the
#_dependent_ job's dependencies instead of enqueueing it.
#
# `Job#fetch_dependencies` sets WATCH on all dependencies. If
# WatchError is raised in the when the pipeline is executed, that means
# something else has modified either the set of dependencies or the
# status of one of them. In this case, we simply retry.
if depends_on is not None:
with self.connection.pipeline() as pipe:
while True:
try:
pipe.watch(job.dependencies_key)
def _publish_job_event_when_finished(self, job):
"""Send event notifying the result of a finished job"""
job_status = job.get_status()
if job_status == rq.job.JobStatus.FINISHED:
event_type = JobEventType.COMPLETED
payload = job.return_value
elif job_status == rq.job.JobStatus.FAILED:
event_type = JobEventType.FAILURE
payload = {
'error': job.exc_info,
'result': job.meta.get('result', None)
}
else:
logger.warning("Unexpected job status %s for finished job %s",
job_status, job.id)
event_type = JobEventType.UNDEFINED
payload = job_status
task_id = job.kwargs['task_id']
if job.timeout == -1:
timeout = -1
else:
timeout = job.timeout or 180
if heartbeat_ttl is None:
heartbeat_ttl = self.job_monitoring_interval + 5
with self.connection.pipeline() as pipeline:
self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat(heartbeat_ttl, pipeline=pipeline)
registry = StartedJobRegistry(job.origin, self.connection,
job_class=self.job_class)
registry.add(job, timeout, pipeline=pipeline)
job.set_status(JobStatus.STARTED, pipeline=pipeline)
pipeline.hset(job.key, 'started_at', utcformat(utcnow()))
pipeline.execute()
msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time()))
# caused by a SIGINT or SIGTERM signal during
# os.waitpid()), we simply ignore it and enter the next
# iteration of the loop, waiting for the child to end. In
# any other case, this is some other unexpected OS error,
# which we don't want to catch, so we re-raise those ones.
if e.errno != errno.EINTR:
raise
# Send a heartbeat to keep the worker alive.
self.heartbeat()
if ret_val == os.EX_OK: # The process exited normally.
return
job_status = job.get_status()
if job_status is None: # Job completed and its ttl has expired
return
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
if not job.ended_at:
job.ended_at = utcnow()
# Unhandled failure: move the job to the failed queue
self.log.warning((
'Moving job to FailedJobRegistry '
'(work-horse terminated unexpectedly; waitpid returned {})'
).format(ret_val))
self.handle_job_failure(
job,
exc_string="Work-horse process was terminated unexpectedly "
"(waitpid returned %s)" % ret_val
)
def perform_job(self, job, queue):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
self.procline('Processing %s from %s since %s' % (
job.func_name,
job.origin, time.time()))
try:
# I have DISABLED the time limit!
rv = job.perform()
# Pickle the result in the same try-except block since we need to
# use the same exc handling when pickling fails
job._result = rv
job._status = rq.job.JobStatus.FINISHED
job.ended_at = times.now()
#
# Using the code from Worker.handle_job_success
#
with self.connection.pipeline() as pipeline:
pipeline.watch(job.dependents_key)
queue.enqueue_dependents(job, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline)
self.increment_successful_job_count(pipeline=pipeline)
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job.save(pipeline=pipeline, include_meta=False)
# caused by a SIGINT or SIGTERM signal during
# os.waitpid()), we simply ignore it and enter the next
# iteration of the loop, waiting for the child to end. In
# any other case, this is some other unexpected OS error,
# which we don't want to catch, so we re-raise those ones.
if e.errno != errno.EINTR:
raise
# Send a heartbeat to keep the worker alive.
self.heartbeat()
if ret_val == os.EX_OK: # The process exited normally.
return
job_status = job.get_status()
if job_status is None: # Job completed and its ttl has expired
return
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
if not job.ended_at:
job.ended_at = utcnow()
# Unhandled failure: move the job to the failed queue
self.log.warning((
'Moving job to FailedJobRegistry '
'(work-horse terminated unexpectedly; waitpid returned {})'
).format(ret_val))
self.handle_job_failure(
job,
exc_string="Work-horse process was terminated unexpectedly "
"(waitpid returned %s)" % ret_val
)