Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ctx.failed = True
for _ in range(10):
await asyncio.sleep(0.05)
with pytest.raises(RuntimeError):
circuit_breaker.call(ctx)
for _ in range(10):
try:
circuit_breaker.call(ctx)
except (RuntimeError, CircuitBroken):
pass
assert circuit_breaker.state == States.BROKEN
self._recovery_at = current_time
return
# Do not compute when not enough statistic
if (
self._state is CircuitBreakerStates.PASSING
and len(self._statistic) < self.BUCKET_COUNT
):
return
recovery_ratio = self.recovery_ratio
if self._state is CircuitBreakerStates.PASSING:
if recovery_ratio >= self.PASSING_BROKEN_THRESHOLD:
self._stuck_until = current_time + self._broken_time
self._state = CircuitBreakerStates.BROKEN
self._statistic.clear()
return
if self._state is CircuitBreakerStates.RECOVERING:
if recovery_ratio >= self.RECOVER_BROKEN_THRESHOLD:
self._stuck_until = current_time + self._broken_time
self._state = CircuitBreakerStates.BROKEN
self._statistic.clear()
return
recovery_length = current_time - self._recovery_at
if recovery_length >= self._recovery_time:
self._stuck_until = current_time + self._passing_time
self._state = CircuitBreakerStates.PASSING
return
def _compute_state(self):
current_time = time.monotonic()
if current_time < self._stuck_until:
# Skip state changing until
return
if self._state is CircuitBreakerStates.BROKEN:
self._state = CircuitBreakerStates.RECOVERING
self._recovery_at = current_time
return
# Do not compute when not enough statistic
if (
self._state is CircuitBreakerStates.PASSING
and len(self._statistic) < self.BUCKET_COUNT
):
return
recovery_ratio = self.recovery_ratio
if self._state is CircuitBreakerStates.PASSING:
if recovery_ratio >= self.PASSING_BROKEN_THRESHOLD:
self._stuck_until = current_time + self._broken_time