How to use the readerwriterlock.rwlock.Lockable function in readerwriterlock

To help you get started, we’ve selected a few readerwriterlock examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github elarivie / pyReaderWriterLock / readerwriterlock / rwlock.py View on Github external
class RWLockWriteD(RWLockableD):
	"""A Read/Write lock giving preference to Writer."""

	def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
		"""Init."""
		self.v_read_count: _ThreadSafeInt = _ThreadSafeInt(lock_factory=lock_factory, initial_value=0)
		self.v_write_count: int = 0
		self.c_time_source = time_source
		self.c_lock_read_count = lock_factory()
		self.c_lock_write_count = lock_factory()
		self.c_lock_read_entry = lock_factory()
		self.c_lock_read_try = lock_factory()
		self.c_resource = lock_factory()
		super().__init__()

	class _aReader(Lockable):
		def __init__(self, p_RWLock: "RWLockWriteD") -> None:
			self.c_rw_lock = p_RWLock
			self.v_locked: bool = False
			super().__init__()

		def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
			"""Acquire a lock."""
			p_timeout = None if (blocking and timeout < 0) else (timeout if blocking else 0)
			c_deadline = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
			if not self.c_rw_lock.c_lock_read_entry.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
				return False
			if not self.c_rw_lock.c_lock_read_try.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
				self.c_rw_lock.c_lock_read_entry.release()
				return False
			if not self.c_rw_lock.c_lock_read_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
				self.c_rw_lock.c_lock_read_try.release()
github elarivie / pyReaderWriterLock / readerwriterlock / rwlock.py View on Github external
"""Generate a writer lock."""
		return RWLockFair._aWriter(self)


class RWLockReadD(RWLockableD):
	"""A Read/Write lock giving preference to Reader."""

	def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
		"""Init."""
		self.v_read_count: _ThreadSafeInt = _ThreadSafeInt(initial_value=0, lock_factory=lock_factory)
		self.c_time_source = time_source
		self.c_resource = lock_factory()
		self.c_lock_read_count = lock_factory()
		super().__init__()

	class _aReader(Lockable):
		def __init__(self, p_RWLock: "RWLockReadD") -> None:
			self.c_rw_lock = p_RWLock
			self.v_locked: bool = False
			super().__init__()

		def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
			"""Acquire a lock."""
			p_timeout: Optional[float] = None if (blocking and timeout < 0) else (timeout if blocking else 0)
			c_deadline: Optional[float] = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
			if not self.c_rw_lock.c_lock_read_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
				return False
			self.c_rw_lock.v_read_count += 1
			if 1 == int(self.c_rw_lock.v_read_count):
				if not self.c_rw_lock.c_resource.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
					self.c_rw_lock.v_read_count -= 1
					self.c_rw_lock.c_lock_read_count.release()
github elarivie / pyReaderWriterLock / readerwriterlock / rwlock.py View on Github external
def release(self) -> None:
			"""Release the lock."""
			if not self.v_locked: raise RELEASE_ERR_CLS(RELEASE_ERR_MSG)
			self.v_locked = False
			self.c_rw_lock.c_lock_read_count.acquire()
			self.c_rw_lock.v_read_count -= 1
			if 0 == self.c_rw_lock.v_read_count:
				self.c_rw_lock.c_resource.release()
			self.c_rw_lock.c_lock_read_count.release()

		def locked(self) -> bool:
			"""Answer to 'is it currently locked?'."""
			return self.v_locked

	class _aWriter(Lockable):
		def __init__(self, p_RWLock: "RWLockWrite") -> None:
			self.c_rw_lock = p_RWLock
			self.v_locked: bool = False
			super().__init__()

		def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
			"""Acquire a lock."""
			p_timeout = None if (blocking and timeout < 0) else (timeout if blocking else 0)
			c_deadline = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
			if not self.c_rw_lock.c_lock_write_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
				return False
			self.c_rw_lock.v_write_count += 1
			if 1 == self.c_rw_lock.v_write_count:
				if not self.c_rw_lock.c_lock_read_try.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
					self.c_rw_lock.v_write_count -= 1
					self.c_rw_lock.c_lock_write_count.release()
github elarivie / pyReaderWriterLock / readerwriterlock / rwlock.py View on Github external
class RWLockWrite(RWLockable):
	"""A Read/Write lock giving preference to Writer."""

	def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
		"""Init."""
		self.v_read_count: int = 0
		self.v_write_count: int = 0
		self.c_time_source = time_source
		self.c_lock_read_count = lock_factory()
		self.c_lock_write_count = lock_factory()
		self.c_lock_read_entry = lock_factory()
		self.c_lock_read_try = lock_factory()
		self.c_resource = lock_factory()
		super().__init__()

	class _aReader(Lockable):
		def __init__(self, p_RWLock: "RWLockWrite") -> None:
			self.c_rw_lock = p_RWLock
			self.v_locked: bool = False
			super().__init__()

		def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
			"""Acquire a lock."""
			p_timeout = None if (blocking and timeout < 0) else (timeout if blocking else 0)
			c_deadline = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
			if not self.c_rw_lock.c_lock_read_entry.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
				return False
			if not self.c_rw_lock.c_lock_read_try.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
				self.c_rw_lock.c_lock_read_entry.release()
				return False
			if not self.c_rw_lock.c_lock_read_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
				self.c_rw_lock.c_lock_read_try.release()
github elarivie / pyReaderWriterLock / readerwriterlock / rwlock.py View on Github external
"""Answer to 'is it currently locked?'."""
		raise AssertionError("Should be overriden")  # Will be overriden.  # pragma: no cover

	def __enter__(self) -> bool:
		"""Enter context manager."""
		self.acquire()
		return False

	def __exit__(self, exc_type: Optional[Type[BaseException]], exc_val: Optional[Exception], exc_tb: Optional[TracebackType]) -> Optional[bool]:  # type: ignore
		"""Exit context manager."""
		self.release()
		return False


@runtime_checkable
class LockableD(Lockable, Protocol):
	"""Lockable Downgradable."""

	def downgrade(self) -> Lockable:
		"""Downgrade."""
		raise AssertionError("Should be overriden")  # Will be overriden.  # pragma: no cover


class _ThreadSafeInt():
	"""Internal thread safe integer like object.

	Implements only the bare minimum features for the RWLock implementation's need.
	"""

	def __init__(self, initial_value: int, lock_factory: Callable[[], Lockable] = threading.Lock) -> None:
		"""Init."""
		self.__value_lock = lock_factory()
github elarivie / pyReaderWriterLock / readerwriterlock / rwlock.py View on Github external
"""Generate a writer lock."""
		raise AssertionError("Should be overriden")  # Will be overriden.  # pragma: no cover


class RWLockRead(RWLockable):
	"""A Read/Write lock giving preference to Reader."""

	def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
		"""Init."""
		self.v_read_count: int = 0
		self.c_time_source = time_source
		self.c_resource = lock_factory()
		self.c_lock_read_count = lock_factory()
		super().__init__()

	class _aReader(Lockable):
		def __init__(self, p_RWLock: "RWLockRead") -> None:
			self.c_rw_lock = p_RWLock
			self.v_locked: bool = False
			super().__init__()

		def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
			"""Acquire a lock."""
			p_timeout: Optional[float] = None if (blocking and timeout < 0) else (timeout if blocking else 0)
			c_deadline: Optional[float] = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
			if not self.c_rw_lock.c_lock_read_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
				return False
			self.c_rw_lock.v_read_count += 1
			if 1 == self.c_rw_lock.v_read_count:
				if not self.c_rw_lock.c_resource.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
					self.c_rw_lock.v_read_count -= 1
					self.c_rw_lock.c_lock_read_count.release()
github elarivie / pyReaderWriterLock / readerwriterlock / rwlock.py View on Github external
return RWLockWriteD._aWriter(self)


class RWLockFairD(RWLockableD):
	"""A Read/Write lock giving fairness to both Reader and Writer."""

	def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
		"""Init."""
		self.v_read_count: int = 0
		self.c_time_source = time_source
		self.c_lock_read_count = lock_factory()
		self.c_lock_read = lock_factory()
		self.c_lock_write = lock_factory()
		super().__init__()

	class _aReader(Lockable):
		def __init__(self, p_RWLock: "RWLockFairD") -> None:
			self.c_rw_lock = p_RWLock
			self.v_locked: bool = False
			super().__init__()

		def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
			"""Acquire a lock."""
			p_timeout = None if (blocking and timeout < 0) else (timeout if blocking else 0)
			c_deadline = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
			if not self.c_rw_lock.c_lock_read.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
				return False
			if not self.c_rw_lock.c_lock_read_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
				self.c_rw_lock.c_lock_read.release()
				return False
			self.c_rw_lock.v_read_count += 1
			if 1 == self.c_rw_lock.v_read_count:
github elarivie / pyReaderWriterLock / readerwriterlock / rwlock.py View on Github external
def release(self) -> None:
			"""Release the lock."""
			if not self.v_locked: raise RELEASE_ERR_CLS(RELEASE_ERR_MSG)
			self.v_locked = False
			self.c_rw_lock.c_lock_read_count.acquire()
			self.c_rw_lock.v_read_count -= 1
			if 0 == self.c_rw_lock.v_read_count:
				self.c_rw_lock.c_resource.release()
			self.c_rw_lock.c_lock_read_count.release()

		def locked(self) -> bool:
			"""Answer to 'is it currently locked?'."""
			return self.v_locked

	class _aWriter(Lockable):
		def __init__(self, p_RWLock: "RWLockRead") -> None:
			self.c_rw_lock = p_RWLock
			self.v_locked: bool = False
			super().__init__()

		def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
			"""Acquire a lock."""
			locked: bool = self.c_rw_lock.c_resource.acquire(blocking, timeout)
			self.v_locked = locked
			return locked

		def release(self) -> None:
			"""Release the lock."""
			if not self.v_locked: raise RELEASE_ERR_CLS(RELEASE_ERR_MSG)
			self.v_locked = False
			self.c_rw_lock.c_resource.release()
github elarivie / pyReaderWriterLock / readerwriterlock / rwlock.py View on Github external
return RWLockWrite._aWriter(self)


class RWLockFair(RWLockable):
	"""A Read/Write lock giving fairness to both Reader and Writer."""

	def __init__(self, lock_factory: Callable[[], Lockable] = threading.Lock, time_source: Callable[[], float] = time.perf_counter) -> None:
		"""Init."""
		self.v_read_count: int = 0
		self.c_time_source = time_source
		self.c_lock_read_count = lock_factory()
		self.c_lock_read = lock_factory()
		self.c_lock_write = lock_factory()
		super().__init__()

	class _aReader(Lockable):
		def __init__(self, p_RWLock: "RWLockFair") -> None:
			self.c_rw_lock = p_RWLock
			self.v_locked: bool = False
			super().__init__()

		def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
			"""Acquire a lock."""
			p_timeout = None if (blocking and timeout < 0) else (timeout if blocking else 0)
			c_deadline = None if p_timeout is None else (self.c_rw_lock.c_time_source() + p_timeout)
			if not self.c_rw_lock.c_lock_read.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
				return False
			if not self.c_rw_lock.c_lock_read_count.acquire(blocking=True, timeout=-1 if c_deadline is None else max(0, c_deadline - self.c_rw_lock.c_time_source())):
				self.c_rw_lock.c_lock_read.release()
				return False
			self.c_rw_lock.v_read_count += 1
			if 1 == self.c_rw_lock.v_read_count: