Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class RWLockWriteD(RWLockableD):
"""A Read/Write lock giving preference to Writer."""
def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
"""Init."""
self.v_read_count: _ThreadSafeInt = _ThreadSafeInt(lock_factory=lock_factory, initial_value=0)
self.v_write_count: int = 0
self.c_time_source = time_source
self.c_lock_read_count = lock_factory()
self.c_lock_write_count = lock_factory()
self.c_lock_read_entry = lock_factory()
self.c_lock_read_try = lock_factory()
self.c_resource = lock_factory()
super().__init__()
class _aReader(Lockable):
def __init__(self, p_RWLock: "RWLockWriteD") -> None:
self.c_rw_lock = p_RWLock
self.v_locked: bool = False
super().__init__()
def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
"""Acquire a lock."""
p_timeout = None if (blocking and timeout < 0) else (timeout if blocking else 0)
c_deadline = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
if not self.c_rw_lock.c_lock_read_entry.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
return False
if not self.c_rw_lock.c_lock_read_try.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
self.c_rw_lock.c_lock_read_entry.release()
return False
if not self.c_rw_lock.c_lock_read_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
self.c_rw_lock.c_lock_read_try.release()
"""Generate a writer lock."""
return RWLockFair._aWriter(self)
class RWLockReadD(RWLockableD):
"""A Read/Write lock giving preference to Reader."""
def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
"""Init."""
self.v_read_count: _ThreadSafeInt = _ThreadSafeInt(initial_value=0, lock_factory=lock_factory)
self.c_time_source = time_source
self.c_resource = lock_factory()
self.c_lock_read_count = lock_factory()
super().__init__()
class _aReader(Lockable):
def __init__(self, p_RWLock: "RWLockReadD") -> None:
self.c_rw_lock = p_RWLock
self.v_locked: bool = False
super().__init__()
def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
"""Acquire a lock."""
p_timeout: Optional[float] = None if (blocking and timeout < 0) else (timeout if blocking else 0)
c_deadline: Optional[float] = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
if not self.c_rw_lock.c_lock_read_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
return False
self.c_rw_lock.v_read_count += 1
if 1 == int(self.c_rw_lock.v_read_count):
if not self.c_rw_lock.c_resource.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
self.c_rw_lock.v_read_count -= 1
self.c_rw_lock.c_lock_read_count.release()
def release(self) -> None:
"""Release the lock."""
if not self.v_locked: raise RELEASE_ERR_CLS(RELEASE_ERR_MSG)
self.v_locked = False
self.c_rw_lock.c_lock_read_count.acquire()
self.c_rw_lock.v_read_count -= 1
if 0 == self.c_rw_lock.v_read_count:
self.c_rw_lock.c_resource.release()
self.c_rw_lock.c_lock_read_count.release()
def locked(self) -> bool:
"""Answer to 'is it currently locked?'."""
return self.v_locked
class _aWriter(Lockable):
def __init__(self, p_RWLock: "RWLockWrite") -> None:
self.c_rw_lock = p_RWLock
self.v_locked: bool = False
super().__init__()
def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
"""Acquire a lock."""
p_timeout = None if (blocking and timeout < 0) else (timeout if blocking else 0)
c_deadline = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
if not self.c_rw_lock.c_lock_write_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
return False
self.c_rw_lock.v_write_count += 1
if 1 == self.c_rw_lock.v_write_count:
if not self.c_rw_lock.c_lock_read_try.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
self.c_rw_lock.v_write_count -= 1
self.c_rw_lock.c_lock_write_count.release()
class RWLockWrite(RWLockable):
"""A Read/Write lock giving preference to Writer."""
def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
"""Init."""
self.v_read_count: int = 0
self.v_write_count: int = 0
self.c_time_source = time_source
self.c_lock_read_count = lock_factory()
self.c_lock_write_count = lock_factory()
self.c_lock_read_entry = lock_factory()
self.c_lock_read_try = lock_factory()
self.c_resource = lock_factory()
super().__init__()
class _aReader(Lockable):
def __init__(self, p_RWLock: "RWLockWrite") -> None:
self.c_rw_lock = p_RWLock
self.v_locked: bool = False
super().__init__()
def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
"""Acquire a lock."""
p_timeout = None if (blocking and timeout < 0) else (timeout if blocking else 0)
c_deadline = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
if not self.c_rw_lock.c_lock_read_entry.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
return False
if not self.c_rw_lock.c_lock_read_try.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
self.c_rw_lock.c_lock_read_entry.release()
return False
if not self.c_rw_lock.c_lock_read_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
self.c_rw_lock.c_lock_read_try.release()
"""Answer to 'is it currently locked?'."""
raise AssertionError("Should be overriden") # Will be overriden. # pragma: no cover
def __enter__(self) -> bool:
"""Enter context manager."""
self.acquire()
return False
def __exit__(self, exc_type: Optional[Type[BaseException]], exc_val: Optional[Exception], exc_tb: Optional[TracebackType]) -> Optional[bool]: # type: ignore
"""Exit context manager."""
self.release()
return False
@runtime_checkable
class LockableD(Lockable, Protocol):
"""Lockable Downgradable."""
def downgrade(self) -> Lockable:
"""Downgrade."""
raise AssertionError("Should be overriden") # Will be overriden. # pragma: no cover
class _ThreadSafeInt():
"""Internal thread safe integer like object.
Implements only the bare minimum features for the RWLock implementation's need.
"""
def __init__(self, initial_value: int, lock_factory: Callable[[], Lockable] = threading.Lock) -> None:
"""Init."""
self.__value_lock = lock_factory()
"""Generate a writer lock."""
raise AssertionError("Should be overriden") # Will be overriden. # pragma: no cover
class RWLockRead(RWLockable):
"""A Read/Write lock giving preference to Reader."""
def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
"""Init."""
self.v_read_count: int = 0
self.c_time_source = time_source
self.c_resource = lock_factory()
self.c_lock_read_count = lock_factory()
super().__init__()
class _aReader(Lockable):
def __init__(self, p_RWLock: "RWLockRead") -> None:
self.c_rw_lock = p_RWLock
self.v_locked: bool = False
super().__init__()
def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
"""Acquire a lock."""
p_timeout: Optional[float] = None if (blocking and timeout < 0) else (timeout if blocking else 0)
c_deadline: Optional[float] = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
if not self.c_rw_lock.c_lock_read_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
return False
self.c_rw_lock.v_read_count += 1
if 1 == self.c_rw_lock.v_read_count:
if not self.c_rw_lock.c_resource.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
self.c_rw_lock.v_read_count -= 1
self.c_rw_lock.c_lock_read_count.release()
return RWLockWriteD._aWriter(self)
class RWLockFairD(RWLockableD):
"""A Read/Write lock giving fairness to both Reader and Writer."""
def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
"""Init."""
self.v_read_count: int = 0
self.c_time_source = time_source
self.c_lock_read_count = lock_factory()
self.c_lock_read = lock_factory()
self.c_lock_write = lock_factory()
super().__init__()
class _aReader(Lockable):
def __init__(self, p_RWLock: "RWLockFairD") -> None:
self.c_rw_lock = p_RWLock
self.v_locked: bool = False
super().__init__()
def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
"""Acquire a lock."""
p_timeout = None if (blocking and timeout < 0) else (timeout if blocking else 0)
c_deadline = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
if not self.c_rw_lock.c_lock_read.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
return False
if not self.c_rw_lock.c_lock_read_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
self.c_rw_lock.c_lock_read.release()
return False
self.c_rw_lock.v_read_count += 1
if 1 == self.c_rw_lock.v_read_count:
def release(self) -> None:
"""Release the lock."""
if not self.v_locked: raise RELEASE_ERR_CLS(RELEASE_ERR_MSG)
self.v_locked = False
self.c_rw_lock.c_lock_read_count.acquire()
self.c_rw_lock.v_read_count -= 1
if 0 == self.c_rw_lock.v_read_count:
self.c_rw_lock.c_resource.release()
self.c_rw_lock.c_lock_read_count.release()
def locked(self) -> bool:
"""Answer to 'is it currently locked?'."""
return self.v_locked
class _aWriter(Lockable):
def __init__(self, p_RWLock: "RWLockRead") -> None:
self.c_rw_lock = p_RWLock
self.v_locked: bool = False
super().__init__()
def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
"""Acquire a lock."""
locked: bool = self.c_rw_lock.c_resource.acquire(blocking, timeout)
self.v_locked = locked
return locked
def release(self) -> None:
"""Release the lock."""
if not self.v_locked: raise RELEASE_ERR_CLS(RELEASE_ERR_MSG)
self.v_locked = False
self.c_rw_lock.c_resource.release()
return RWLockWrite._aWriter(self)
class RWLockFair(RWLockable):
"""A Read/Write lock giving fairness to both Reader and Writer."""
def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
"""Init."""
self.v_read_count: int = 0
self.c_time_source = time_source
self.c_lock_read_count = lock_factory()
self.c_lock_read = lock_factory()
self.c_lock_write = lock_factory()
super().__init__()
class _aReader(Lockable):
def __init__(self, p_RWLock: "RWLockFair") -> None:
self.c_rw_lock = p_RWLock
self.v_locked: bool = False
super().__init__()
def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
"""Acquire a lock."""
p_timeout = None if (blocking and timeout < 0) else (timeout if blocking else 0)
c_deadline = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
if not self.c_rw_lock.c_lock_read.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
return False
if not self.c_rw_lock.c_lock_read_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
self.c_rw_lock.c_lock_read.release()
return False
self.c_rw_lock.v_read_count += 1
if 1 == self.c_rw_lock.v_read_count: