Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
acquisition)
:type max_delay: int/float
:param timeout: an optional timeout (limits how long blocking
will occur for)
:type timeout: int/float
:returns: whether or not the acquisition succeeded
:rtype: bool
"""
if delay < 0:
raise ValueError("Delay must be greater than or equal to zero")
if timeout is not None and timeout < 0:
raise ValueError("Timeout must be greater than or equal to zero")
if delay >= max_delay:
max_delay = delay
self._do_open()
watch = _utils.StopWatch(duration=timeout)
r = _utils.Retry(delay, max_delay,
sleep_func=self.sleep_func, watch=watch)
with watch:
gotten = r(self._try_acquire, blocking, watch)
if not gotten:
self.acquired = False
return False
else:
self.acquired = True
self.logger.log(_utils.BLATHER,
"Acquired file lock `%s` after waiting %0.3fs [%s"
" attempts were required]", self.path,
watch.elapsed(), r.attempts)
return True