Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_repr(loop):
rwlock = RWLock(loop=loop)
assert 'RWLock' in rwlock.__repr__()
assert 'WriterLock: [unlocked' in rwlock.__repr__()
assert 'ReaderLock: [unlocked' in rwlock.__repr__()
# reader lock __repr__
await rwlock.reader_lock.acquire()
assert 'ReaderLock: [locked]' in rwlock.__repr__()
rwlock.reader_lock.release()
assert 'ReaderLock: [unlocked]' in rwlock.__repr__()
# writer lock __repr__
await rwlock.writer_lock.acquire()
assert 'WriterLock: [locked]' in rwlock.__repr__()
rwlock.writer_lock.release()
assert 'WriterLock: [unlocked]' in rwlock.__repr__()
async def test_get_write_then_read_and_write_again(loop):
rwlock = RWLock(loop=loop)
rl = rwlock.reader
wl = rwlock.writer
f = loop.create_future()
writes = []
async def get_write_lock():
await f
with should_fail(.1, loop):
async with wl:
assert wl.locked
writes.append('should not be here')
ensure_future(get_write_lock(), loop=loop)
async with wl:
async def test_writer_success_with_statement(loop):
# Verify that a writer can get access
rwlock = RWLock(loop=loop)
N = 5
reads = 0
writes = 0
async def r():
# read until we achive write successes
nonlocal reads, writes
while writes < 2:
# print("current pre-reads", reads)
async with rwlock.reader_lock:
reads += 1
# print("current reads", reads)
async def w():
nonlocal reads, writes
while reads == 0:
def test_ctor_noloop_writer(loop):
asyncio.set_event_loop(loop)
rwlock = RWLock().writer_lock
assert rwlock._lock._loop is loop
async def test_read_upgrade_write_release(loop):
rwlock = RWLock(loop=loop)
await rwlock.writer_lock.acquire()
await rwlock.reader_lock.acquire()
await rwlock.reader_lock.acquire()
await rwlock.reader_lock.acquire()
rwlock.reader_lock.release()
assert rwlock.writer_lock.locked
rwlock.writer_lock.release()
assert not rwlock.writer_lock.locked
assert rwlock.reader.locked
with pytest.raises(RuntimeError):
await rwlock.writer_lock.acquire()
def test_ctor_loop_writer(loop):
rwlock = RWLock(loop=loop).writer_lock
assert rwlock._lock._loop is loop
def test_raise_error_on_with_for_reader_lock(loop, fast_track):
rwlock = RWLock(loop=loop, fast=fast_track)
with pytest.raises(RuntimeError):
with rwlock.reader_lock:
pass
async def test_release_unlocked(loop):
rwlock = RWLock(loop=loop)
with pytest.raises(RuntimeError):
rwlock.reader_lock.release()
return dataset, any(
(ds["mountpoint"] + "/").startswith(directory + "/")
for ds in datasets
if ds != dataset
)
class _FsLockCore(aiorwlock._RWLockCore):
def _release(self, lock_type):
if self._r_state == 0 and self._w_state == 0:
self._fs_manager._remove_lock(self._fs_path)
return super()._release(lock_type)
class _FsLock(aiorwlock.RWLock):
core = _FsLockCore
class FsLockDirection(enum.Enum):
READ = 0
WRITE = 1
class FsLockManager:
_lock = _FsLock
def __init__(self):
self.locks = {}
def lock(self, path, direction):
path = os.path.normpath(path)
async def go():
rwlock = aiorwlock.RWLock()
# acquire reader lock
await rwlock.reader_lock.acquire()
try:
print('inside reader lock')
await asyncio.sleep(0.1)
finally:
rwlock.reader_lock.release()
# acquire writer lock
await rwlock.writer_lock.acquire()
try:
print('inside writer lock')
await asyncio.sleep(0.1)