Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_defer_by(arq_redis: ArqRedis):
j1 = await arq_redis.enqueue_job('foobar', _job_id='job_id', _defer_by=20)
assert isinstance(j1, Job)
score = await arq_redis.zscore(default_queue_name, 'job_id')
ts = timestamp_ms()
assert score > ts + 19000
assert ts + 21000 > score
async def run_job(self, job_id, score): # noqa: C901
start_ms = timestamp_ms()
v, job_try, _ = await asyncio.gather(
self.pool.get(job_key_prefix + job_id, encoding=None),
self.pool.incr(retry_key_prefix + job_id),
self.pool.expire(retry_key_prefix + job_id, 88400),
)
function_name, args, kwargs, enqueue_time_ms = '', (), {}, 0
async def job_failed(exc: Exception):
self.jobs_failed += 1
result_data_ = serialize_result(
function=function_name,
args=args,
kwargs=kwargs,
job_try=job_try,
enqueue_time_ms=enqueue_time_ms,
success=False,
async def _poll_iteration(self):
count = self.queue_read_limit
if self.burst and self.max_burst_jobs >= 0:
burst_jobs_remaining = self.max_burst_jobs - self._jobs_started()
if burst_jobs_remaining < 1:
return
count = min(burst_jobs_remaining, count)
async with self.sem: # don't bother with zrangebyscore until we have "space" to run the jobs
now = timestamp_ms()
job_ids = await self.pool.zrangebyscore(
self.queue_name, offset=self._queue_read_offset, count=count, max=now
)
await self.run_jobs(job_ids)
for t in self.tasks:
if t.done():
self.tasks.remove(t)
# required to make sure errors in run_job get propagated
t.result()
await self.heart_beat()
async def job_failed(exc: Exception):
self.jobs_failed += 1
result_data_ = serialize_result(
function=function_name,
args=args,
kwargs=kwargs,
job_try=job_try,
enqueue_time_ms=enqueue_time_ms,
success=False,
result=exc,
start_ms=start_ms,
finished_ms=timestamp_ms(),
ref=f'{job_id}:{function_name}',
serializer=self.job_serializer,
)
await asyncio.shield(self.abort_job(job_id, result_data_))
assert not (_defer_until and _defer_by), "use either 'defer_until' or 'defer_by' or neither, not both"
defer_by_ms = to_ms(_defer_by)
expires_ms = to_ms(_expires)
with await self as conn:
pipe = conn.pipeline()
pipe.unwatch()
pipe.watch(job_key)
job_exists = pipe.exists(job_key)
job_result_exists = pipe.exists(result_key_prefix + job_id)
await pipe.execute()
if await job_exists or await job_result_exists:
return
enqueue_time_ms = timestamp_ms()
if _defer_until is not None:
score = to_unix_ms(_defer_until)
elif defer_by_ms:
score = enqueue_time_ms + defer_by_ms
else:
score = enqueue_time_ms
expires_ms = expires_ms or score - enqueue_time_ms + expires_extra_ms
job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer)
tr = conn.multi_exec()
tr.psetex(job_key, expires_ms, job)
tr.zadd(_queue_name, score, job_id)
try:
await tr.execute()
except MultiExecError:
max_tries = self.max_tries if function.max_tries is None else function.max_tries
if job_try > max_tries:
t = (timestamp_ms() - enqueue_time_ms) / 1000
logger.warning('%6.2fs ! %s max retries %d exceeded', t, ref, max_tries)
self.jobs_failed += 1
result_data = serialize_result(
function_name,
args,
kwargs,
job_try,
enqueue_time_ms,
False,
JobExecutionFailed(f'max {max_tries} retries exceeded'),
start_ms,
timestamp_ms(),
ref,
serializer=self.job_serializer,
)
return await asyncio.shield(self.abort_job(job_id, result_data))
result = no_result
exc_extra = None
finish = False
timeout_s = self.job_timeout_s if function.timeout_s is None else function.timeout_s
incr_score = None
job_ctx = {
'job_id': job_id,
'job_try': job_try,
'enqueue_time': ms_to_datetime(enqueue_time_ms),
'score': score,
}
if (start_ms - score) > 1200:
extra += f' delayed={(start_ms - score) / 1000:0.2f}s'
logger.info('%6.2fs → %s(%s)%s', (start_ms - enqueue_time_ms) / 1000, ref, s, extra)
# run repr(result) and extra inside try/except as they can raise exceptions
try:
async with async_timeout.timeout(timeout_s):
result = await function.coroutine(ctx, *args, **kwargs)
except Exception as e:
exc_extra = getattr(e, 'extra', None)
if callable(exc_extra):
exc_extra = exc_extra()
raise
else:
result_str = '' if result is None else truncate(repr(result))
except Exception as e:
finished_ms = timestamp_ms()
t = (finished_ms - start_ms) / 1000
if self.retry_jobs and isinstance(e, Retry):
incr_score = e.defer_score
logger.info('%6.2fs ↻ %s retrying job in %0.2fs', t, ref, (e.defer_score or 0) / 1000)
if e.defer_score:
incr_score = e.defer_score + (timestamp_ms() - score)
self.jobs_retried += 1
elif self.retry_jobs and isinstance(e, asyncio.CancelledError):
logger.info('%6.2fs ↻ %s cancelled, will be run again', t, ref)
self.jobs_retried += 1
else:
logger.exception(
'%6.2fs ! %s failed, %s: %s', t, ref, e.__class__.__name__, e, extra={'extra': exc_extra}
)
result = e
finish = True
)
return await asyncio.shield(self.abort_job(job_id, result_data))
result = no_result
exc_extra = None
finish = False
timeout_s = self.job_timeout_s if function.timeout_s is None else function.timeout_s
incr_score = None
job_ctx = {
'job_id': job_id,
'job_try': job_try,
'enqueue_time': ms_to_datetime(enqueue_time_ms),
'score': score,
}
ctx = {**self.ctx, **job_ctx}
start_ms = timestamp_ms()
success = False
try:
s = args_to_string(args, kwargs)
extra = f' try={job_try}' if job_try > 1 else ''
if (start_ms - score) > 1200:
extra += f' delayed={(start_ms - score) / 1000:0.2f}s'
logger.info('%6.2fs → %s(%s)%s', (start_ms - enqueue_time_ms) / 1000, ref, s, extra)
# run repr(result) and extra inside try/except as they can raise exceptions
try:
async with async_timeout.timeout(timeout_s):
result = await function.coroutine(ctx, *args, **kwargs)
except Exception as e:
exc_extra = getattr(e, 'extra', None)
if callable(exc_extra):
exc_extra = exc_extra()
raise