Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_aioredlock_two_locks_on_different_resources_two_instances(
self,
redis_two_connections):
await self.check_two_locks_on_different_resources(Aioredlock(redis_two_connections))
def test_default_initialization(self):
with patch("aioredlock.algorithm.Redis.__init__") as mock_redis:
mock_redis.return_value = None
lock_manager = Aioredlock()
mock_redis.assert_called_once_with(
[{'host': 'localhost', 'port': 6379}],
lock_manager.lock_timeout
)
assert lock_manager.redis
assert lock_manager.drift == pytest.approx(0.102)
def test_validator(method, exc_message):
with pytest.raises(ValueError) as exc_info:
getattr(Aioredlock, method)(None, None, -1)
assert str(exc_info.value) == exc_message
async def test_aioredlock_lock_with_first_failed_try_two_instances(
self,
redis_two_connections
):
lock_manager = Aioredlock(redis_two_connections)
resource = str(uuid.uuid4())
garbage_value = 'garbage'
first_redis = await aioredis.create_redis(
(redis_two_connections[0]['host'],
redis_two_connections[0]['port'])
)
# write garbage to resource key in first instance
await first_redis.set(resource, garbage_value)
is_garbage = True
# this patched sleep function will remove garbage from
# frist instance before second try
real_sleep = asyncio.sleep
async def test_simple_aioredlock_two_instances(
self,
redis_two_connections):
await self.check_simple_lock(Aioredlock(redis_two_connections))
async def test_lock_one_retry(self, lock_manager_redis_patched, locked_lock):
lock_manager, redis = lock_manager_redis_patched
redis.set_lock = CoroutineMock(side_effect=[
LockError('Can not lock'),
0.001
])
lock = await lock_manager.lock('resource')
calls = [
call('resource', ANY),
call('resource', ANY)
]
redis.set_lock.assert_has_calls(calls)
redis.unset_lock.assert_not_called()
assert lock.resource == 'resource'
assert lock.id == ANY
assert lock.valid is True
async def check_two_locks_on_same_resource(self, lock_manager):
resource = str(uuid.uuid4())
assert await lock_manager.is_locked(resource) is False
lock1 = await lock_manager.lock(resource)
assert lock1.valid is True
assert await lock_manager.is_locked(resource) is True
with pytest.raises(LockError):
await lock_manager.lock(resource)
assert await lock_manager.is_locked(resource) is True
await lock_manager.unlock(lock1)
assert lock1.valid is False
assert await lock_manager.is_locked(resource) is False
await asyncio.sleep(0.2) # wait for lock cleanup
await lock_manager.destroy()
async def test_lock_expire_retries_for_timeouts(self, lock_manager_redis_patched, locked_lock):
lock_manager, redis = lock_manager_redis_patched
redis.set_lock = CoroutineMock(side_effect=[
1.100,
1.001,
2.000
])
with pytest.raises(LockError):
await lock_manager.lock('resource')
await real_sleep(0.1) # wait until cleaning is completed
calls = [
call('resource', ANY),
call('resource', ANY),
call('resource', ANY)
]
redis.set_lock.assert_has_calls(calls)
redis.unset_lock.assert_called_once_with('resource', ANY)
async def test_lock_one_of_two_instances_failed(
self, mock_redis_two_instances,
method_name, call_args
):
redis, pool = mock_redis_two_instances
pool.eval = CoroutineMock(side_effect=[EVAL_ERROR, EVAL_OK])
method = getattr(redis, method_name)
script = getattr(redis.instances[0], '%s_script' % method_name)
with pytest.raises(LockError):
await method('resource', 'lock_id')
calls = [call(script, **call_args)] * 2
pool.eval.assert_has_calls(calls)
self,
mock_redis_three_instances,
redis_result,
success,
method_name, call_args
):
redis, pool = mock_redis_three_instances
pool.eval = CoroutineMock(side_effect=redis_result)
method = getattr(redis, method_name)
script = getattr(redis.instances[0], '%s_script' % method_name)
if success:
await method('resource', 'lock_id')
else:
with pytest.raises(LockError):
await method('resource', 'lock_id')
calls = [call(script, **call_args)] * 3
pool.eval.assert_has_calls(calls)