Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_attempts_and_backoff(self, sleep):
policy = RetryPolicy.new(backoff=0.2, attempts=3)
retries = iter(policy)
next(retries)
next(retries)
sleep.assert_called_with(0.2)
next(retries)
sleep.assert_called_with(0.4)
with self.assertRaises(StopIteration):
next(retries)
def test_budget_overrides_backoff(self, sleep, time):
policy = RetryPolicy.new(backoff=0.1, budget=1)
time.return_value = 0
retries = iter(policy)
time_remaining = next(retries)
self.assertAlmostEqual(time_remaining, 1)
self.assertEqual(sleep.call_count, 0)
time.return_value = 0.5
time_remaining = next(retries)
self.assertAlmostEqual(time_remaining, 0.4)
sleep.assert_called_with(0.1)
time.return_value = 0.9
time_remaining = next(retries)
self.assertAlmostEqual(time_remaining, 0)
self.assertAlmostEqual(sleep.call_args[0][0], 0.1, places=2)
def get_batch(self, max_items: int, timeout: Optional[float]) -> Sequence[Message]:
"""Return a batch of messages.
:param max_items: The maximum batch size.
:param timeout: The maximum time to wait in seconds, or ``None``
for no timeout.
"""
if timeout == 0:
block = False
else:
block = True
batch = []
retry_policy = RetryPolicy.new(attempts=max_items, budget=timeout)
for time_remaining in retry_policy:
item = self.work_queue.get(block=block, timeout=time_remaining)
if item is None:
break
batch.append(item)
return batch
if binary and newline is not None:
raise TypeError("'newline' is not supported in binary mode.")
self._path = path
self._parser = parser
self._mtime = 0.0
self._data: Union[T, Type[_NOT_LOADED]] = _NOT_LOADED
self._open_options = _OpenOptions(
mode="rb" if binary else "r", encoding=encoding, newline=newline
)
backoff = backoff or DEFAULT_FILEWATCHER_BACKOFF
if timeout is not None:
last_error = None
for _ in RetryPolicy.new(budget=timeout, backoff=backoff):
if self._data is not _NOT_LOADED:
break
try:
self.get_data()
except WatchedFileNotAvailableError as exc:
last_error = exc
else:
break
logging.warning("%s: file not yet available. sleeping.", path)
else:
last_error = typing.cast(WatchedFileNotAvailableError, last_error)
raise WatchedFileNotAvailableError(
self._path, f"timed out. last error was: {last_error.inner}"
)