Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
c_rwlock_2 = rwlock.RWLockFairD()
def assert_internal_state():
"""Assert internal."""
self.assertEqual(int(c_rwlock_1.v_read_count), int(c_rwlock_2.v_read_count))
self.assertEqual(bool(c_rwlock_1.c_lock_read_count.locked()), bool(c_rwlock_2.c_lock_read_count.locked()))
self.assertEqual(bool(c_rwlock_1.c_lock_read.locked()), bool(c_rwlock_2.c_lock_read.locked()))
self.assertEqual(bool(c_rwlock_1.c_lock_write.locked()), bool(c_rwlock_2.c_lock_write.locked()))
# ## Assume
assert_internal_state()
# ## Act
a_read_lock = c_rwlock_1.gen_rlock()
a_read_lock.acquire()
a_downgrade_lock: Union[rwlock.LockableD, rwlock.Lockable] = c_rwlock_2.gen_wlock()
a_downgrade_lock.acquire()
assert isinstance(a_downgrade_lock, rwlock.LockableD)
a_downgrade_lock = a_downgrade_lock.downgrade()
# ## Assert
assert_internal_state()
a_read_lock.release()
a_downgrade_lock.release()
assert_internal_state()
self.assertEqual(bool(c_rwlock_1.c_lock_read_count.locked()), bool(c_rwlock_2.c_lock_read_count.locked()))
self.assertEqual(bool(c_rwlock_1.c_lock_write_count.locked()), bool(c_rwlock_2.c_lock_write_count.locked()))
self.assertEqual(bool(c_rwlock_1.c_lock_read_entry.locked()), bool(c_rwlock_2.c_lock_read_entry.locked()))
self.assertEqual(bool(c_rwlock_1.c_lock_read_try.locked()), bool(c_rwlock_2.c_lock_read_try.locked()))
self.assertEqual(bool(c_rwlock_1.c_resource.locked()), bool(c_rwlock_2.c_resource.locked()))
# ## Assume
assert_internal_state()
# ## Act
a_read_lock = c_rwlock_1.gen_rlock()
a_read_lock.acquire()
a_downgrade_lock: Union[rwlock.LockableD, rwlock.Lockable] = c_rwlock_2.gen_wlock()
a_downgrade_lock.acquire()
assert isinstance(a_downgrade_lock, rwlock.LockableD)
a_downgrade_lock = a_downgrade_lock.downgrade()
# ## Assert
assert_internal_state()
a_read_lock.release()
a_downgrade_lock.release()
assert_internal_state()
def release(self) -> None:
"""Release the lock."""
if not self.v_locked: raise RELEASE_ERR_CLS(RELEASE_ERR_MSG)
self.v_locked = False
self.c_rw_lock.c_lock_read_count.acquire()
self.c_rw_lock.v_read_count -= 1
if 0 == self.c_rw_lock.v_read_count:
self.c_rw_lock.c_resource.release()
self.c_rw_lock.c_lock_read_count.release()
def locked(self) -> bool:
"""Answer to 'is it currently locked?'."""
return self.v_locked
class _aWriter(LockableD):
def __init__(self, p_RWLock: "RWLockWriteD") -> None:
self.c_rw_lock = p_RWLock
self.v_locked: bool = False
super().__init__()
def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
"""Acquire a lock."""
p_timeout = None if (blocking and timeout < 0) else (timeout if blocking else 0)
c_deadline = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
if not self.c_rw_lock.c_lock_write_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
return False
self.c_rw_lock.v_write_count += 1
if 1 == self.c_rw_lock.v_write_count:
if not self.c_rw_lock.c_lock_read_try.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
self.c_rw_lock.v_write_count -= 1
self.c_rw_lock.c_lock_write_count.release()
def release(self) -> None:
"""Release the lock."""
if not self.v_locked: raise RELEASE_ERR_CLS(RELEASE_ERR_MSG)
self.v_locked = False
self.c_rw_lock.c_lock_read_count.acquire()
self.c_rw_lock.v_read_count -= 1
if 0 == int(self.c_rw_lock.v_read_count):
self.c_rw_lock.c_resource.release()
self.c_rw_lock.c_lock_read_count.release()
def locked(self) -> bool:
"""Answer to 'is it currently locked?'."""
return self.v_locked
class _aWriter(LockableD):
def __init__(self, p_RWLock: "RWLockReadD") -> None:
self.c_rw_lock = p_RWLock
self.v_locked: bool = False
super().__init__()
def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
"""Acquire a lock."""
locked: bool = self.c_rw_lock.c_resource.acquire(blocking, timeout)
self.v_locked = locked
return locked
def downgrade(self) -> Lockable:
"""Downgrade."""
if not self.v_locked: raise RELEASE_ERR_CLS(RELEASE_ERR_MSG)
result = self.c_rw_lock.gen_rlock()
def release(self) -> None:
"""Release the lock."""
if not self.v_locked: raise RELEASE_ERR_CLS(RELEASE_ERR_MSG)
self.v_locked = False
self.c_rw_lock.c_lock_read_count.acquire()
self.c_rw_lock.v_read_count -= 1
if 0 == self.c_rw_lock.v_read_count:
self.c_rw_lock.c_lock_write.release()
self.c_rw_lock.c_lock_read_count.release()
def locked(self) -> bool:
"""Answer to 'is it currently locked?'."""
return self.v_locked
class _aWriter(LockableD):
def __init__(self, p_RWLock: "RWLockFairD") -> None:
self.c_rw_lock = p_RWLock
self.v_locked: bool = False
super().__init__()
def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
"""Acquire a lock."""
p_timeout = None if (blocking and timeout < 0) else (timeout if blocking else 0)
c_deadline = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
if not self.c_rw_lock.c_lock_read.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
return False
if not self.c_rw_lock.c_lock_write.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
self.c_rw_lock.c_lock_read.release()
return False
self.v_locked = True
return True