Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_custom_try2(arq_redis: ArqRedis, worker):
async def foobar(ctx):
if ctx['job_try'] == 3:
raise Retry()
return ctx['job_try']
j1 = await arq_redis.enqueue_job('foobar', _job_try=3)
w: Worker = worker(functions=[func(foobar, name='foobar')])
await w.main()
r = await j1.result(pole_delay=0)
assert r == 4
async def test_max_jobs_completes(arq_redis: ArqRedis, worker):
v = 0
async def raise_second_time(ctx):
nonlocal v
v += 1
if v > 1:
raise ValueError('xxx')
await arq_redis.enqueue_job('raise_second_time')
await arq_redis.enqueue_job('raise_second_time')
await arq_redis.enqueue_job('raise_second_time')
worker: Worker = worker(functions=[func(raise_second_time, name='raise_second_time')])
with pytest.raises(FailedJobs) as exc_info:
await worker.run_check(max_burst_jobs=3)
assert repr(exc_info.value).startswith('<2 jobs failed:')
async def test_run_check_passes(arq_redis: ArqRedis, worker):
await arq_redis.enqueue_job('foobar')
await arq_redis.enqueue_job('foobar')
worker: Worker = worker(functions=[func(foobar, name='foobar')])
assert 2 == await worker.run_check()
async def test_max_bursts_dont_get(arq_redis: ArqRedis, worker):
async def foo(ctx, v):
return v + 1
await arq_redis.enqueue_job('foo', 1)
await arq_redis.enqueue_job('foo', 2)
worker: Worker = worker(functions=[func(foo, name='foo')])
worker.max_burst_jobs = 0
assert len(worker.tasks) == 0
await worker._poll_iteration()
assert len(worker.tasks) == 0
async def test_max_bursts_sub_call(arq_redis: ArqRedis, worker, caplog):
async def foo(ctx, v):
return v + 1
async def bar(ctx, v):
await ctx['redis'].enqueue_job('foo', v + 1)
caplog.set_level(logging.INFO)
await arq_redis.enqueue_job('bar', 10)
worker: Worker = worker(functions=[func(foo, name='foo'), func(bar, name='bar')])
assert await worker.run_check(max_burst_jobs=1) == 1
assert worker.jobs_complete == 1
assert worker.jobs_retried == 0
assert worker.jobs_failed == 0
assert 'bar(10)' in caplog.text
assert 'foo' in caplog.text
check a job can't be enqueued multiple times with the same id
"""
counter = Counter()
async def count(ctx, v):
counter[v] += 1
tasks = []
for i in range(50):
tasks.extend(
[arq_redis.enqueue_job('count', i, _job_id=f'v-{i}'), arq_redis.enqueue_job('count', i, _job_id=f'v-{i}')]
)
shuffle(tasks)
await asyncio.gather(*tasks)
worker: Worker = worker(functions=[func(count, name='count')])
await worker.main()
assert counter.most_common(1)[0][1] == 1 # no job go enqueued twice
async def test_return_exception(arq_redis: ArqRedis, worker):
async def return_error(ctx):
return TypeError('xxx')
j = await arq_redis.enqueue_job('return_error')
worker: Worker = worker(functions=[func(return_error, name='return_error')])
await worker.main()
assert (worker.jobs_complete, worker.jobs_failed, worker.jobs_retried) == (1, 0, 0)
r = await j.result(pole_delay=0)
assert isinstance(r, TypeError)
info = await j.result_info()
assert info.success is True
def test_no_jobs(arq_redis: ArqRedis, loop):
class Settings:
functions = [func(foobar, name='foobar')]
burst = True
poll_delay = 0
queue_read_limit = 10
loop.run_until_complete(arq_redis.enqueue_job('foobar'))
worker = run_worker(Settings)
assert worker.jobs_complete == 1
assert str(worker) == ''
async def test_retry_lots(arq_redis: ArqRedis, worker, caplog):
async def retry(ctx):
raise Retry()
caplog.set_level(logging.INFO)
await arq_redis.enqueue_job('retry', _job_id='testing')
worker: Worker = worker(functions=[func(retry, name='retry')])
await worker.main()
assert worker.jobs_complete == 0
assert worker.jobs_failed == 1
assert worker.jobs_retried == 5
log = re.sub(r'\d+.\d\ds', 'X.XXs', '\n'.join(r.message for r in caplog.records))
assert ' X.XXs ! testing:retry max retries 5 exceeded' in log
async def test_set_health_check_key(arq_redis: ArqRedis, worker):
await arq_redis.enqueue_job('foobar', _job_id='testing')
worker: Worker = worker(functions=[func(foobar, keep_result=0)], health_check_key='arq:test:health-check')
await worker.main()
assert sorted(await arq_redis.keys('*')) == ['arq:test:health-check']