Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_retry_any(self):
r = tenacity.retry_any(
tenacity.retry_if_result(lambda x: x == 1),
tenacity.retry_if_result(lambda x: x == 2))
self.assertTrue(r(tenacity.Future.construct(1, 1, False)))
self.assertTrue(r(tenacity.Future.construct(1, 2, False)))
self.assertFalse(r(tenacity.Future.construct(1, 3, False)))
self.assertFalse(r(tenacity.Future.construct(1, 1, True)))
def wrapped(*args, **kwargs):
self = args[0]
new_fn = tenacity.retry(
reraise=True,
retry=tenacity.retry_if_result(_ovsdb_result_pending),
wait=tenacity.wait_exponential(multiplier=0.02, max=1),
stop=tenacity.stop_after_delay(
self.ovsdb_timeout))(fn)
return new_fn(*args, **kwargs)
return wrapped
return result is False
def check_init_container_stopped():
status = self.show(context, container).status
if status == consts.STOPPED:
return True
elif status == consts.RUNNING:
return False
else:
raise exception.ZunException(
_("Container has unexpected status: %s") % status)
r = tenacity.Retrying(
stop=tenacity.stop_after_delay(timeout),
wait=tenacity.wait_exponential(),
retry=tenacity.retry_if_result(retry_if_result_is_false))
r.call(check_init_container_stopped)
def _put_object_safe(self, Bucket, Key, Body):
put = self.s3.put_object(Bucket=Bucket, Key=Key, Body=Body)
if self._consistency_stop:
def _head():
return self.s3.head_object(Bucket=Bucket,
Key=Key, IfMatch=put['ETag'])
tenacity.Retrying(
retry=tenacity.retry_if_result(
lambda r: r['ETag'] != put['ETag']),
wait=self._consistency_wait,
stop=self._consistency_stop)(_head)
retry=tenacity.retry_if_result(lambda x: x is False),
wait=tenacity.wait_fixed(retry_interval),
retry_error_callback=_return_last_value,
stop=tenacity.stop_after_attempt(max_unhealthy_retry)
)
def _poll_url_with_retry(url):
return self._poll_url(url, node)
def wait_till_jobs_complete(self, node, timeout=3600):
name = node.get('metadata', {}).get('name')
retryer = tenacity.Retrying(
stop=tenacity.stop_after_delay(timeout),
retry=tenacity.retry_if_result(
lambda result: len(result.get('items', [])) != 0),
wait=tenacity.wait_fixed(5))
retryer(self._get_job_pods_in_node, name, "Running")
@tenacity.retry(retry=tenacity.retry_if_result(is_empty),
wait=tenacity.wait_fixed(wait_timeout),
stop=tenacity.stop_after_delay(check_timeout))
def _get_hosts_retry(target, listener_type):
return method(target, listener_type)
return result is False
def check_init_container_stopped():
status = self._show_container(context, container).status
if status == consts.STOPPED:
return True
elif status == consts.RUNNING:
return False
else:
raise exception.ZunException(
_("Container has unexpected status: %s") % status)
r = tenacity.Retrying(
stop=tenacity.stop_after_delay(timeout),
wait=tenacity.wait_exponential(),
retry=tenacity.retry_if_result(retry_if_result_is_false))
r.call(check_init_container_stopped)
tenacity.retry_if_result(_is_provisioning_status_pending_update) |
tenacity.retry_if_exception_type()),
wait=tenacity.wait_incrementing(
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def _get_db_obj_until_pending_update(self, repo, id):
return repo.get(db_apis.get_session(), id=id)