Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _try_acquire(self, blocking, watch):
try:
self.trylock()
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
if not blocking or watch.expired():
return False
else:
raise _utils.RetryAgain()
else:
raise threading.ThreadError("Unable to acquire lock on"
" `%(path)s` due to"
" %(exception)s" %
{
'path': self.path,
'exception': e,
})
else:
return True
def __call__(self, fn, *args, **kwargs):
while True:
self.attempts += 1
try:
return fn(*args, **kwargs)
except RetryAgain:
maybe_delay = self.attempts * self.delay
if maybe_delay < self.max_delay:
actual_delay = maybe_delay
else:
actual_delay = self.max_delay
actual_delay = max(0.0, actual_delay)
if self.watch is not None:
leftover = self.watch.leftover()
if leftover is not None and leftover < actual_delay:
actual_delay = leftover
self.sleep_func(actual_delay)