How to use the cachetools.ttl_cache function in cachetools

To help you get started, we’ve selected a few cachetools examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github zalando / nakadi / nakadi / security.py View on Github external
@ttl_cache(config.TOKEN_CACHE_SIZE, config.TOKEN_CACHE_TTL)
def get_token_info(token):

    retry_attempts = 0
    while retry_attempts < config.TOKEN_INFO_RETRY_LIMIT:

        try:
            response = requests.get(config.TOKEN_INFO_URL, params={'access_token': token}, timeout=config.TOKEN_INFO_TIMEOUT_S)
            return response

        except RequestException as e:
            logging.info('[#OUATH_RETRY] Error while calling token_info endpoint: %s', e)
            retry_attempts += 1
            time.sleep(config.TOKEN_INFO_RETRY_WAIT_S)

    logging.error('[#OUATH_FAIL] Failed to do OAUTH check after %s retries', retry_attempts)
    raise FailedTokenInfoEndpoint()
github gnocchixyz / gnocchi / gnocchi / ceilometer / alarm_api.py View on Github external
    @cachetools.ttl_cache(maxsize=1, ttl=600)
    def _get_aggregation_methods():
        ks_client = utils.get_keystone_client()
        gnocchi_url = cfg.CONF.alarm_gnocchi.url
        headers = {'Content-Type': "application/json",
                   'X-Auth-Token': ks_client.auth_token}
        try:
            r = requests.get("%s/v1/capabilities" % gnocchi_url,
                             headers=headers)
        except requests.ConnectionError as e:
            raise GnocchiUnavailable(e)
        if r.status_code / 200 != 1:
            raise GnocchiUnavailable(r.text)

        return jsonutils.loads(r.text).get('aggregation_methods', [])
github nocproject / noc / pm / db / base.py View on Github external
    @cachetools.ttl_cache(maxsize=50, ttl=60)
    def get_partition_by_name(self, name):
        logger.debug("Opening partition %s", name)
        return self.kvcls(self, name)
github nattofriends / moffle / log_path.py View on Github external
    @cachetools.ttl_cache(maxsize=128, ttl=21600)
    def _channels_list(self, network):
        network_base = self.network_to_path(network)

        if not os.path.exists(network_base):
            return None

        files = os.listdir(network_base)
        files = [{'channel': channel} for channel in files]

        return files