Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_multi_thread(self) -> None:
"""
# Given: Bunch of reader & writer lock generated from a RW lock.
# When: Locking/unlocking in a multi thread setup.
# Then: the locks shall not deadlock.
"""
s_period_sec: int = 60
print(f"test_MultiThread {s_period_sec * len(self.c_rwlock_type)} sec…", flush=True)
exception_occured: bool = False
for c_curr_lock_type in self.c_rwlock_type:
with self.subTest(c_curr_lock_type):
print(f" {c_curr_lock_type} …", end="", flush=True)
c_curr_rw_lock: Union[rwlock.RWLockable, rwlock.RWLockableD] = c_curr_lock_type()
v_value: int = 0
def downgrader1() -> None:
"""Downgrader using a timeout blocking acquire strategy."""
if c_curr_lock_type not in self.c_rwlock_type_downgradable: return
try:
nonlocal v_value
c_enter_time = time.time()
while time.time() - c_enter_time <= s_period_sec:
c_lock_w1: Union[rwlock.Lockable, rwlock.LockableD] = c_curr_rw_lock.gen_wlock()
assert isinstance(c_lock_w1, rwlock.LockableD), type(c_lock_w1)
time.sleep(sys.float_info.min)
locked: bool = c_lock_w1.acquire(blocking=True, timeout=sys.float_info.min)
if locked:
try:
# Asert like a writer
@runtime_checkable
class RWLockableD(Protocol):
"""Read/write lock Downgradable."""
def gen_rlock(self) -> Lockable:
"""Generate a reader lock."""
raise AssertionError("Should be overriden") # Will be overriden. # pragma: no cover
def gen_wlock(self) -> LockableD:
"""Generate a writer lock."""
raise AssertionError("Should be overriden") # Will be overriden. # pragma: no cover
class RWLockRead(RWLockable):
"""A Read/Write lock giving preference to Reader."""
def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
"""Init."""
self.v_read_count: int = 0
self.c_time_source = time_source
self.c_resource = lock_factory()
self.c_lock_read_count = lock_factory()
super().__init__()
class _aReader(Lockable):
def __init__(self, p_RWLock: "RWLockRead") -> None:
self.c_rw_lock = p_RWLock
self.v_locked: bool = False
super().__init__()
self.c_rw_lock.c_resource.release()
def locked(self) -> bool:
"""Answer to 'is it currently locked?'."""
return self.v_locked
def gen_rlock(self) -> "RWLockRead._aReader":
"""Generate a reader lock."""
return RWLockRead._aReader(self)
def gen_wlock(self) -> "RWLockRead._aWriter":
"""Generate a writer lock."""
return RWLockRead._aWriter(self)
class RWLockWrite(RWLockable):
"""A Read/Write lock giving preference to Writer."""
def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
"""Init."""
self.v_read_count: int = 0
self.v_write_count: int = 0
self.c_time_source = time_source
self.c_lock_read_count = lock_factory()
self.c_lock_write_count = lock_factory()
self.c_lock_read_entry = lock_factory()
self.c_lock_read_try = lock_factory()
self.c_resource = lock_factory()
super().__init__()
class _aReader(Lockable):
def __init__(self, p_RWLock: "RWLockWrite") -> None:
self.c_rw_lock.c_lock_write_count.release()
def locked(self) -> bool:
"""Answer to 'is it currently locked?'."""
return self.v_locked
def gen_rlock(self) -> "RWLockWrite._aReader":
"""Generate a reader lock."""
return RWLockWrite._aReader(self)
def gen_wlock(self) -> "RWLockWrite._aWriter":
"""Generate a writer lock."""
return RWLockWrite._aWriter(self)
class RWLockFair(RWLockable):
"""A Read/Write lock giving fairness to both Reader and Writer."""
def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
"""Init."""
self.v_read_count: int = 0
self.c_time_source = time_source
self.c_lock_read_count = lock_factory()
self.c_lock_read = lock_factory()
self.c_lock_write = lock_factory()
super().__init__()
class _aReader(Lockable):
def __init__(self, p_RWLock: "RWLockFair") -> None:
self.c_rw_lock = p_RWLock
self.v_locked: bool = False
super().__init__()