How to use the tenacity.Retrying function in tenacity

To help you get started, we’ve selected a few tenacity examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github cosmicpython / code / tests / e2e / test_external_events.py View on Github external
api_client.post_to_add_batch(later_batch, sku, qty=10, eta='2011-01-03')
    r = api_client.post_to_allocate(orderid, sku, 10)
    assert r.ok
    response = api_client.get_allocation(orderid)
    assert response.json()[0]['batchref'] == earlier_batch

    subscription = redis_client.subscribe_to('line_allocated')

    # change quantity on allocated batch so it's less than our order
    redis_client.publish_message('change_batch_quantity', {
        'batchref': earlier_batch, 'qty': 5
    })

    # wait until we see a message saying the order has been reallocated
    messages = []
    for attempt in Retrying(stop=stop_after_delay(3), reraise=True):
        with attempt:
            message = subscription.get_message(timeout=1)
            if message:
                messages.append(message)
                print(messages)
            data = json.loads(messages[-1]['data'])
            assert data['orderid'] == orderid
            assert data['batchref'] == later_batch
github jd / tenacity / test_tenacity.py View on Github external
def test_stop_after_attempt(self):
        r = Retrying(stop=tenacity.stop_after_attempt(3))
        self.assertFalse(r.stop(2, 6546))
        self.assertTrue(r.stop(3, 6546))
        self.assertTrue(r.stop(4, 6546))
github openstack / qinling / qinling / engine / utils.py View on Github external
def url_request(request_session, url, body=None):
    """Send request to a service url."""
    exception = None

    # Send ping request first to make sure the url works
    try:
        temp = url.split('/')
        temp[-1] = 'ping'
        ping_url = '/'.join(temp)
        r = tenacity.Retrying(
            wait=tenacity.wait_fixed(1),
            stop=tenacity.stop_after_attempt(30),
            reraise=True,
            retry=tenacity.retry_if_exception_type(IOError)
        )
        r.call(request_session.get, ping_url, timeout=(3, 3), verify=False)
    except Exception as e:
        LOG.exception(
            "Failed to request url %s, error: %s", ping_url, str(e)
        )
        return False, {'output': 'Function execution failed.'}

    for a in six.moves.xrange(10):
        res = None
        try:
            # Default execution max duration is 3min, could be configurable
github laughingman7743 / PyAthena / pyathena / util.py View on Github external
def retry_api_call(func, config, logger=None,
                   *args, **kwargs):
    retry = tenacity.Retrying(
        retry=retry_if_exception(
            lambda e: getattr(e, 'response', {}).get(
                'Error', {}).get('Code', None) in config.exceptions
            if e else False),
        stop=stop_after_attempt(config.attempt),
        wait=wait_exponential(multiplier=config.multiplier,
                              max=config.max_delay,
                              exp_base=config.exponential_base),
        after=after_log(logger, logger.level) if logger else None,
        reraise=True
    )
    return retry(func, *args, **kwargs)
github laughingman7743 / PyAthena / pyathena / sqlalchemy_athena.py View on Github external
query = """
                SELECT
                  table_schema,
                  table_name,
                  column_name,
                  data_type,
                  is_nullable,
                  column_default,
                  ordinal_position,
                  comment
                FROM information_schema.columns
                WHERE table_schema = '{schema}'
                AND table_name = '{table}'
                """.format(schema=schema, table=table_name)
        retry_config = raw_connection.retry_config
        retry = tenacity.Retrying(
            retry=retry_if_exception(
                lambda exc: self._retry_if_data_catalog_exception(exc, schema, table_name)),
            stop=stop_after_attempt(retry_config.attempt),
            wait=wait_exponential(multiplier=retry_config.multiplier,
                                  max=retry_config.max_delay,
                                  exp_base=retry_config.exponential_base),
            reraise=True)
        try:
            return [
                {
                    'name': row.column_name,
                    'type': _TYPE_MAPPINGS.get(self._get_column_type(row.data_type), NULLTYPE),
                    'nullable': True if row.is_nullable == 'YES' else False,
                    'default': row.column_default if not self._is_nan(row.column_default) else None,
                    'ordinal_position': row.ordinal_position,
                    'comment': row.comment,
github gnocchixyz / gnocchi / gnocchi / storage / s3.py View on Github external
def _put_object_safe(self, Bucket, Key, Body):
        put = self.s3.put_object(Bucket=Bucket, Key=Key, Body=Body)

        if self._consistency_stop:

            def _head():
                return self.s3.head_object(Bucket=Bucket,
                                           Key=Key, IfMatch=put['ETag'])

            tenacity.Retrying(
                retry=tenacity.retry_if_result(
                    lambda r: r['ETag'] != put['ETag']),
                wait=self._consistency_wait,
                stop=self._consistency_stop)(_head)
github opsgenie / opsgenie-python-sdk / opsgenie_sdk / api_client.py View on Github external
def __init__(self, configuration=None, header_name=None, header_value=None,
                 cookie=None, pool_threads=1):
        if configuration is None:
            configuration = Configuration()
        self.configuration = configuration
        self.pool_threads = pool_threads

        self.retrying = tenacity.Retrying(stop=self.should_retry_stop,
                                          wait=tenacity.wait_random_exponential(multiplier=configuration.back_off,
                                                                                max=configuration.retry_delay),
                                          retry=(tenacity.retry_if_exception_type(RetryableException) |
                                                 (tenacity.retry_if_exception_type(HTTPError))))

        self.rest_client = rest.RESTClientObject(configuration, retrying=self.retrying)
        self.default_headers = {}
        if header_name is not None:
            self.default_headers[header_name] = header_value
        self.cookie = cookie
        # Set default User-Agent.
        self.user_agent = 'opsgenie-sdk-python-2.0.3'

        # init metric publishers
        self.http_metric_publisher = self.rest_client.http_metric
        self.api_metric_publisher = metrics.ApiMetric('ApiMetricPublisher')
github willianantunes / django-graphql-playground / django_graphql_playground / apps / pubsub / services / producer.py View on Github external
def _retry_send(self, function: Callable, attempt=10, *args, **kwargs):
        retry_configuration = tenacity.Retrying(
            stop=tenacity.stop_after_attempt(attempt),
            wait=tenacity.wait_fixed(3) + tenacity.wait_random(0, 2),
            after=tenacity.after_log(logger, logger.level) if logger else None,
            reraise=True,
        )
        return retry_configuration(function, *args, **kwargs)