Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_timeout(self, mock_retry_factory):
with tempfile.NamedTemporaryFile() as watched_file:
watched_file.write(b"!")
watched_file.flush()
os.utime(watched_file.name, (1, 1))
mock_retry_policy = mock.MagicMock(spec=RetryPolicy)
mock_retry_policy.__iter__ = mock.Mock(return_value=iter([3, 2, 1]))
mock_retry_factory.return_value = mock_retry_policy
with self.assertRaises(file_watcher.WatchedFileNotAvailableError):
file_watcher.FileWatcher(watched_file.name, parser=json.load, timeout=3)
def put(self, message: bytes, timeout: Optional[float] = None) -> None:
"""Add a message to the queue.
:param timeout: If the queue is full, the call will block up to
``timeout`` seconds or forever if ``None``.
:raises: :py:exc:`TimedOutError` The queue was full for the allowed
duration of the call.
"""
for time_remaining in RetryPolicy.new(budget=timeout):
try:
return self.queue.send(message=message)
except posix_ipc.SignalError: # pragma: nocover
continue # interrupted, just try again
except posix_ipc.BusyError:
select.select([], [self.queue.mqd], [], time_remaining)
raise TimedOutError
protocol_factory: TProtocolFactory = THeaderProtocol.THeaderProtocolFactory(),
):
if max_connection_attempts and max_retries:
raise Exception("do not mix max_retries and max_connection_attempts")
if max_retries:
warn_deprecated(
"ThriftConnectionPool's max_retries is now named max_connection_attempts"
)
max_connection_attempts = max_retries
elif not max_connection_attempts:
max_connection_attempts = 3
self.endpoint = endpoint
self.max_age = max_age
self.retry_policy = RetryPolicy.new(attempts=max_connection_attempts)
self.timeout = timeout
self.protocol_factory = protocol_factory
self.size = size
self.pool: ProtocolPool = queue.LifoQueue()
for _ in range(size):
self.pool.put(None)
"""
policy: RetryPolicy = IndefiniteRetryPolicy()
if attempts is not None:
policy = MaximumAttemptsRetryPolicy(policy, attempts)
if budget is not None:
policy = TimeBudgetRetryPolicy(policy, budget)
if backoff is not None:
policy = ExponentialBackoffRetryPolicy(policy, backoff)
return policy
class IndefiniteRetryPolicy(RetryPolicy): # pragma: noqa
"""Retry immediately forever."""
def yield_attempts(self) -> Iterator[Optional[float]]:
while True:
yield None
class MaximumAttemptsRetryPolicy(RetryPolicy):
"""Constrain the total number of attempts."""
def __init__(self, policy: RetryPolicy, attempts: int):
self.subpolicy = policy
self.attempts = attempts
def yield_attempts(self) -> Iterator[Optional[float]]:
for i, remaining in enumerate(self.subpolicy):
def get(self, timeout: Optional[float] = None) -> bytes:
"""Read a message from the queue.
:param timeout: If the queue is empty, the call will block up to
``timeout`` seconds or forever if ``None``.
:raises: :py:exc:`TimedOutError` The queue was empty for the allowed
duration of the call.
"""
for time_remaining in RetryPolicy.new(budget=timeout):
try:
message, _ = self.queue.receive()
return message
except posix_ipc.SignalError: # pragma: nocover
continue # interrupted, just try again
except posix_ipc.BusyError:
select.select([self.queue.mqd], [], [], time_remaining)
raise TimedOutError
def publish(self, payload: SerializedBatch) -> None:
if not payload.item_count:
return
logger.info("sending batch of %d events", payload.item_count)
compressed_payload = gzip.compress(payload.serialized)
headers = {
"Date": email.utils.formatdate(usegmt=True),
"User-Agent": "baseplate-event-publisher/1.0",
"Content-Type": "application/json",
"X-Signature": self._sign_payload(payload.serialized),
"Content-Encoding": "gzip",
}
for _ in RetryPolicy.new(budget=MAX_RETRY_TIME, backoff=RETRY_BACKOFF):
try:
with self.metrics.timer("post"):
response = self.session.post(
self.url,
headers=headers,
data=compressed_payload,
timeout=POST_TIMEOUT,
# http://docs.python-requests.org/en/latest/user/advanced/#keep-alive
stream=False,
)
response.raise_for_status()
except requests.HTTPError as exc:
self.metrics.counter("error.http").increment()
# we should crash if it's our fault
response = getattr(exc, "response", None)
"""
assert self.started
assert not self.stopped
logger.debug("Stopping server.")
self.stopped = True
# Stop the pump first so we stop consuming messages from the message
# queue
logger.debug("Stopping pump thread.")
self.pump.stop()
# It's important to call `handler.stop()` before calling `join` on the
# handler threads, otherwise we'll be waiting for threads that have not
# been instructed to stop.
logger.debug("Stopping message handler threads.")
for handler in self.handlers:
handler.stop()
retry_policy = RetryPolicy.new(budget=self.stop_timeout.total_seconds())
logger.debug("Waiting for message handler threads to drain.")
for time_remaining, thread in zip(retry_policy, self.threads):
thread.join(timeout=time_remaining)
# Stop the healthcheck server last
logger.debug("Stopping healthcheck server.")
self.healthcheck_server.stop()
logger.debug("Server stopped.")
class MaximumAttemptsRetryPolicy(RetryPolicy):
"""Constrain the total number of attempts."""
def __init__(self, policy: RetryPolicy, attempts: int):
self.subpolicy = policy
self.attempts = attempts
def yield_attempts(self) -> Iterator[Optional[float]]:
for i, remaining in enumerate(self.subpolicy):
if i == self.attempts:
break
yield remaining
class TimeBudgetRetryPolicy(RetryPolicy):
"""Constrain attempts to an overall time budget."""
def __init__(self, policy: RetryPolicy, budget: float):
assert budget >= 0, "The time budget must not be negative."
self.subpolicy = policy
self.budget = budget
def yield_attempts(self) -> Iterator[Optional[float]]:
start_time = time.time()
yield self.budget
for _ in self.subpolicy:
elapsed = time.time() - start_time
time_remaining = self.budget - elapsed
if time_remaining <= 0:
if backoff is not None:
policy = ExponentialBackoffRetryPolicy(policy, backoff)
return policy
class IndefiniteRetryPolicy(RetryPolicy): # pragma: noqa
"""Retry immediately forever."""
def yield_attempts(self) -> Iterator[Optional[float]]:
while True:
yield None
class MaximumAttemptsRetryPolicy(RetryPolicy):
"""Constrain the total number of attempts."""
def __init__(self, policy: RetryPolicy, attempts: int):
self.subpolicy = policy
self.attempts = attempts
def yield_attempts(self) -> Iterator[Optional[float]]:
for i, remaining in enumerate(self.subpolicy):
if i == self.attempts:
break
yield remaining
class TimeBudgetRetryPolicy(RetryPolicy):
"""Constrain attempts to an overall time budget."""
self.budget = budget
def yield_attempts(self) -> Iterator[Optional[float]]:
start_time = time.time()
yield self.budget
for _ in self.subpolicy:
elapsed = time.time() - start_time
time_remaining = self.budget - elapsed
if time_remaining <= 0:
break
yield time_remaining
class ExponentialBackoffRetryPolicy(RetryPolicy):
"""Sleep exponentially longer between attempts."""
def __init__(self, policy: RetryPolicy, base: float):
self.subpolicy = policy
self.base = base
def yield_attempts(self) -> Iterator[Optional[float]]:
for attempt, time_remaining in enumerate(self.subpolicy):
if attempt > 0:
delay = self.base * 2.0 ** (attempt - 1.0)
if time_remaining:
delay = min(delay, time_remaining)
time_remaining -= delay
time.sleep(delay)