How to use the readerwriterlock.rwlock function in readerwriterlock

To help you get started, we’ve selected a few readerwriterlock examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github elarivie / pyReaderWriterLock / tests / test_rwlock.py View on Github external
def downgrader1() -> None:
					"""Downgrader using a timeout blocking acquire strategy."""
					if c_curr_lock_type not in self.c_rwlock_type_downgradable: return
					try:
						nonlocal v_value
						c_enter_time = time.time()
						while time.time() - c_enter_time <= s_period_sec:
							c_lock_w1: Union[rwlock.Lockable, rwlock.LockableD] = c_curr_rw_lock.gen_wlock()
							assert isinstance(c_lock_w1, rwlock.LockableD), type(c_lock_w1)
							time.sleep(sys.float_info.min)
							locked: bool = c_lock_w1.acquire(blocking=True, timeout=sys.float_info.min)
							if locked:
								try:
									# Asert like a writer
									v_temp = v_value
									v_value += 1
									self.assertEqual(v_value, (v_temp + 1))

									assert isinstance(c_lock_w1, rwlock.LockableD), c_lock_w1
									c_lock_w1 = c_lock_w1.downgrade()
									assert isinstance(c_lock_w1, rwlock.Lockable), c_lock_w1

									# Asert like a reader
									vv_value: int = v_value
github elarivie / pyReaderWriterLock / tests / test_rwlock.py View on Github external
def setUp(self) -> None:
		"""Test setup."""
		self.c_rwlock_type_downgradable = (rwlock.RWLockReadD, rwlock.RWLockWriteD, rwlock.RWLockFairD)
		self.c_rwlock_type = (rwlock.RWLockRead, rwlock.RWLockWrite, rwlock.RWLockFair) + self.c_rwlock_type_downgradable
github elarivie / pyReaderWriterLock / tests / test_rwlock.py View on Github external
def test_write_req15(self) -> None:
		"""
		# Given: a Downgradable RW lock type to instantiate a RW lock.

		# When: A a generated writer lock is downgraed but it wasn't in a locked state.

		# Then: the generated locks raise an exception if released while being unlocked.
		"""
		# ## Arrange
		for current_rw_lock_type in self.c_rwlock_type_downgradable:
			with self.subTest(current_rw_lock_type):
				current_rw_lock = current_rw_lock_type()
				assert isinstance(current_rw_lock, rwlock.RWLockableD)
				current_lock: Union[rwlock.LockableD] = current_rw_lock.gen_wlock()

				with self.assertRaises(rwlock.RELEASE_ERR_CLS) as err:
					current_lock.release()
				self.assertEqual(str(err.exception), str(rwlock.RELEASE_ERR_MSG))

				with self.assertRaises(rwlock.RELEASE_ERR_CLS):
					# ## Assume
					self.assertFalse((current_lock.locked()))
					# ## Act
					current_lock.downgrade()

				self.assertFalse(current_lock.locked())
github elarivie / pyReaderWriterLock / tests / test_rwlock.py View on Github external
# Given: a Downgradable RW lock type to instantiate a RW lock.

		# When: A a generated writer lock is downgraed but it wasn't in a locked state.

		# Then: the generated locks raise an exception if released while being unlocked.
		"""
		# ## Arrange
		for current_rw_lock_type in self.c_rwlock_type_downgradable:
			with self.subTest(current_rw_lock_type):
				current_rw_lock = current_rw_lock_type()
				assert isinstance(current_rw_lock, rwlock.RWLockableD)
				current_lock: Union[rwlock.LockableD] = current_rw_lock.gen_wlock()

				with self.assertRaises(rwlock.RELEASE_ERR_CLS) as err:
					current_lock.release()
				self.assertEqual(str(err.exception), str(rwlock.RELEASE_ERR_MSG))

				with self.assertRaises(rwlock.RELEASE_ERR_CLS):
					# ## Assume
					self.assertFalse((current_lock.locked()))
					# ## Act
					current_lock.downgrade()

				self.assertFalse(current_lock.locked())
github elarivie / pyReaderWriterLock / tests / test_rwlock.py View on Github external
def setUp(self) -> None:
		"""Test setup."""
		self.c_rwlock_type_downgradable = (rwlock.RWLockWriteD, rwlock.RWLockFairD, rwlock.RWLockReadD)
		self.c_rwlock_type = (rwlock.RWLockRead, rwlock.RWLockWrite, rwlock.RWLockFair) + self.c_rwlock_type_downgradable