Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
>>> bool(printer_lock.locked())
True
>>> printer_lock.release()
>>> bool(printer_lock.locked())
False
'''
futures, num_masters_released = set(), 0
with concurrent.futures.ThreadPoolExecutor() as executor:
for master in self.masters:
futures.add(executor.submit(self._release_master, master))
for future in concurrent.futures.as_completed(futures):
with contextlib.suppress(TimeoutError, ConnectionError):
num_masters_released += future.result()
quorum = num_masters_released >= len(self.masters) // 2 + 1
if not quorum:
raise ReleaseUnlockedLock(self.masters, self.key)
self._extension_num = 0
futures, num_masters_acquired = set(), 0
with ContextTimer() as timer, \
concurrent.futures.ThreadPoolExecutor() as executor:
for master in self.masters:
futures.add(executor.submit(self._acquire_master, master))
for future in concurrent.futures.as_completed(futures):
with contextlib.suppress(TimeoutError, ConnectionError):
num_masters_acquired += future.result()
quorum = num_masters_acquired >= len(self.masters) // 2 + 1
elapsed = timer.elapsed() - self._drift()
validity_time = self.auto_release_time - elapsed
if quorum and max(validity_time, 0):
return True
else:
with contextlib.suppress(ReleaseUnlockedLock):
self.release()
return False