Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_defer_until(arq_redis: ArqRedis):
j1 = await arq_redis.enqueue_job('foobar', _job_id='job_id', _defer_until=datetime(2032, 1, 1))
assert isinstance(j1, Job)
score = await arq_redis.zscore(default_queue_name, 'job_id')
assert score == 1_956_528_000_000
async def test_many_jobs_expire(arq_redis: ArqRedis, worker, caplog):
caplog.set_level(logging.INFO)
await arq_redis.enqueue_job('foobar')
await asyncio.gather(*[arq_redis.zadd(default_queue_name, 1, f'testing-{i}') for i in range(100)])
worker: Worker = worker(functions=[foobar])
assert worker.jobs_complete == 0
assert worker.jobs_failed == 0
assert worker.jobs_retried == 0
await worker.main()
assert worker.jobs_complete == 1
assert worker.jobs_failed == 100
assert worker.jobs_retried == 0
log = '\n'.join(r.message for r in caplog.records)
assert 'job testing-0 expired' in log
assert log.count(' expired') == 100
async def test_enqueue_job(arq_redis: ArqRedis, worker, queue_name=default_queue_name):
async def foobar(ctx, *args, **kwargs):
return 42
j = await arq_redis.enqueue_job('foobar', 1, 2, c=3, _queue_name=queue_name)
assert isinstance(j, Job)
assert JobStatus.queued == await j.status()
worker: Worker = worker(functions=[func(foobar, name='foobar')], queue_name=queue_name)
await worker.main()
r = await j.result(pole_delay=0)
assert r == 42
assert JobStatus.complete == await j.status()
info = await j.info()
assert info == JobResult(
job_try=1,
function='foobar',
args=(1, 2),
async def test_defer_by(arq_redis: ArqRedis):
j1 = await arq_redis.enqueue_job('foobar', _job_id='job_id', _defer_by=20)
assert isinstance(j1, Job)
score = await arq_redis.zscore(default_queue_name, 'job_id')
ts = timestamp_ms()
assert score > ts + 19000
assert ts + 21000 > score
async def test_health_check_pass(arq_redis):
await arq_redis.set(default_queue_name + health_check_key_suffix, b'1')
assert 0 == await async_check_health(None)
def __init__(
self, job_id: str, redis, _queue_name: str = default_queue_name, _deserializer: Optional[Deserializer] = None
):
self.job_id = job_id
self._redis = redis
self._queue_name = _queue_name
self._deserializer = _deserializer