Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_lock_one_retry(self, lock_manager_redis_patched, locked_lock):
lock_manager, redis = lock_manager_redis_patched
redis.set_lock = CoroutineMock(side_effect=[
LockError('Can not lock'),
0.001
])
lock = await lock_manager.lock('resource')
calls = [
call('resource', ANY),
call('resource', ANY)
]
redis.set_lock.assert_has_calls(calls)
redis.unset_lock.assert_not_called()
assert lock.resource == 'resource'
assert lock.id == ANY
assert lock.valid is True
async def check_two_locks_on_same_resource(self, lock_manager):
resource = str(uuid.uuid4())
assert await lock_manager.is_locked(resource) is False
lock1 = await lock_manager.lock(resource)
assert lock1.valid is True
assert await lock_manager.is_locked(resource) is True
with pytest.raises(LockError):
await lock_manager.lock(resource)
assert await lock_manager.is_locked(resource) is True
await lock_manager.unlock(lock1)
assert lock1.valid is False
assert await lock_manager.is_locked(resource) is False
await asyncio.sleep(0.2) # wait for lock cleanup
await lock_manager.destroy()
async def test_lock_expire_retries_for_timeouts(self, lock_manager_redis_patched, locked_lock):
lock_manager, redis = lock_manager_redis_patched
redis.set_lock = CoroutineMock(side_effect=[
1.100,
1.001,
2.000
])
with pytest.raises(LockError):
await lock_manager.lock('resource')
await real_sleep(0.1) # wait until cleaning is completed
calls = [
call('resource', ANY),
call('resource', ANY),
call('resource', ANY)
]
redis.set_lock.assert_has_calls(calls)
redis.unset_lock.assert_called_once_with('resource', ANY)
'redis://localhost:6379/2',
'redis://localhost:6379/3',
])
if await lock_manager.is_locked("resource"):
print('The resource is already acquired')
try:
async with await lock_manager.lock("resource") as lock:
assert lock.valid is True
assert await lock_manager.is_locked("resource") is True
# Do your stuff having the lock
await lock.extend()
# Do more stuff having the lock
assert lock.valid is False # lock will be released by context manager
except LockError:
print('"resource" key might be not empty. Please call '
'"del resource" in redis-cli')
raise
assert lock.valid is False
assert await lock_manager.is_locked("resource") is False
await lock_manager.destroy()
async def schedule(self, ctx: object, agent_id: AgentId, event_name: str,
*args, **kwargs) -> None:
lock = await self.lock_manager.lock('manager.scheduler')
try:
async with lock:
await asyncio.sleep(0.5)
await self.schedule_impl()
except aioredlock.LockError as e:
log.debug('schedule(): temporary locking failure', exc_info=e)
async def schedule(self, ctx: object, agent_id: AgentId, event_name: str,
*args, **kwargs) -> None:
try:
lock = await self.lock_manager.lock('manager.scheduler')
async with lock:
await self.schedule_impl()
except aioredlock.LockError:
log.debug('schedule(): temporary locking failure; will be retried.')
# The dispatcher will try the next chance.
async with lock:
async with dbpool.acquire() as conn, conn.begin():
query = (sa.select([error_logs.c.id])
.select_from(error_logs)
.where(error_logs.c.created_at < boundary))
result = await conn.execute(query)
log_ids = []
async for row in result:
log_ids.append(row['id'])
if len(log_ids) > 0:
log.info('Cleaning up {} log{}', len(log_ids), 's' if len(log_ids) > 1 else '')
query = error_logs.delete().where(error_logs.c.id.in_(log_ids))
result = await conn.execute(query)
assert result.rowcount == len(log_ids)
except aioredlock.LockError:
log.debug('schedule(): temporary locking failure; will be retried.')