How to use the cachecontrol.wrapper.CacheControl function in CacheControl

To help you get started, we’ve selected a few CacheControl examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github ionrock / cachecontrol / tests / test_adapter.py View on Github external
def use_wrapper():
    print("Using helper")
    sess = CacheControl(Session())
    return sess
github RedHatInsights / insights-core / insights / core / remote_resource.py View on Github external
def __init__(self):

        session = requests.Session()

        if not self.__class__._cache:
            if self.backend == "RedisCache":
                pool = redis.ConnectionPool(host=self.redis_host, port=self.redis_port, db=0)
                r = redis.Redis(connection_pool=pool)
                self.__class__._cache = RedisCache(r)
            elif self.backend == "FileCache":
                self.__class__._cache = FileCache(self.file_cache_path)
            else:
                self.__class__._cache = DictCache()

        session = CacheControl(session, heuristic=DefaultHeuristic(self.expire_after), cache=self.__class__._cache)

        super(CachedRemoteResource, self).__init__(session)
github common-workflow-language / common-workflow-language / v1.1.0-dev1 / salad / schema_salad / python_codegen_support.py View on Github external
fileuri = copyfrom.fileuri
        else:
            self.idx = {}

        if fetcher is None:
            import os
            import requests
            from cachecontrol.wrapper import CacheControl
            from cachecontrol.caches import FileCache
            from schema_salad.ref_resolver import DefaultFetcher
            if "HOME" in os.environ:
                session = CacheControl(
                    requests.Session(),
                    cache=FileCache(os.path.join(os.environ["HOME"], ".cache", "salad")))
            elif "TMP" in os.environ:
                session = CacheControl(
                    requests.Session(),
                    cache=FileCache(os.path.join(os.environ["TMP"], ".cache", "salad")))
            else:
                session = CacheControl(
                    requests.Session(),
                    cache=FileCache("/tmp", ".cache", "salad"))
            self.fetcher = DefaultFetcher({}, session)
        else:
            self.fetcher = fetcher

        self.fileuri = fileuri

        self.vocab = _vocab
        self.rvocab = _rvocab

        if namespaces is not None:
github common-workflow-language / cwltool / cwltool / schemas / v1.0 / salad / schema_salad / ref_resolver.py View on Github external
if skip_schemas is not None:
            self.skip_schemas = skip_schemas
        else:
            self.skip_schemas = False

        if session is None:
            if "HOME" in os.environ:
                self.session = CacheControl(
                    requests.Session(),
                    cache=FileCache(os.path.join(os.environ["HOME"], ".cache", "salad")))
            elif "TMP" in os.environ:
                self.session = CacheControl(
                    requests.Session(),
                    cache=FileCache(os.path.join(os.environ["TMP"], ".cache", "salad")))
            else:
                self.session = CacheControl(
                    requests.Session(),
                    cache=FileCache("/tmp", ".cache", "salad"))
        else:
            self.session = session

        if fetcher_constructor is not None:
            self.fetcher_constructor = fetcher_constructor
        else:
            self.fetcher_constructor = DefaultFetcher
        self.fetcher = self.fetcher_constructor(self.cache, self.session)

        self.fetch_text = self.fetcher.fetch_text
        self.check_exists = self.fetcher.check_exists

        self.url_fields = set()         # type: Set[Text]
        self.scoped_ref_fields = {}     # type: Dict[Text, int]
github common-workflow-language / schema_salad / schema_salad / metaschema.py View on Github external
if fetcher is None:
            import requests
            from cachecontrol.wrapper import CacheControl
            from cachecontrol.caches import FileCache
            from schema_salad.ref_resolver import DefaultFetcher

            if "HOME" in os.environ:
                session = CacheControl(
                    requests.Session(),
                    cache=FileCache(
                        os.path.join(os.environ["HOME"], ".cache", "salad")
                    ),
                )
            elif "TMPDIR" in os.environ:
                session = CacheControl(
                    requests.Session(),
                    cache=FileCache(
                        os.path.join(os.environ["TMPDIR"], ".cache", "salad")
                    ),
                )
            else:
                session = CacheControl(
                    requests.Session(), cache=FileCache("/tmp", ".cache", "salad")
                )
            self.fetcher = DefaultFetcher({}, session)  # type: Fetcher
        else:
            self.fetcher = fetcher

        self.vocab = _vocab
        self.rvocab = _rvocab
github common-workflow-language / common-workflow-language / v1.1.0-dev1 / salad / schema_salad / metaschema.py View on Github external
if fetcher is None:
            import os
            import requests
            from cachecontrol.wrapper import CacheControl
            from cachecontrol.caches import FileCache
            from schema_salad.ref_resolver import DefaultFetcher
            if "HOME" in os.environ:
                session = CacheControl(
                    requests.Session(),
                    cache=FileCache(os.path.join(os.environ["HOME"], ".cache", "salad")))
            elif "TMP" in os.environ:
                session = CacheControl(
                    requests.Session(),
                    cache=FileCache(os.path.join(os.environ["TMP"], ".cache", "salad")))
            else:
                session = CacheControl(
                    requests.Session(),
                    cache=FileCache("/tmp", ".cache", "salad"))
            self.fetcher = DefaultFetcher({}, session)
        else:
            self.fetcher = fetcher

        self.fileuri = fileuri

        self.vocab = _vocab
        self.rvocab = _rvocab

        if namespaces is not None:
            self.vocab = self.vocab.copy()
            self.rvocab = self.rvocab.copy()
            for k,v in six.iteritems(namespaces):
                self.vocab[k] = v
github common-workflow-language / schema_salad / schema_salad / python_codegen_support.py View on Github external
if "HOME" in os.environ:
                session = CacheControl(
                    requests.Session(),
                    cache=FileCache(
                        os.path.join(os.environ["HOME"], ".cache", "salad")
                    ),
                )
            elif "TMPDIR" in os.environ:
                session = CacheControl(
                    requests.Session(),
                    cache=FileCache(
                        os.path.join(os.environ["TMPDIR"], ".cache", "salad")
                    ),
                )
            else:
                session = CacheControl(
                    requests.Session(), cache=FileCache("/tmp", ".cache", "salad")
                )
            self.fetcher = DefaultFetcher({}, session)  # type: Fetcher
        else:
            self.fetcher = fetcher

        self.vocab = _vocab
        self.rvocab = _rvocab

        if namespaces is not None:
            self.vocab = self.vocab.copy()
            self.rvocab = self.rvocab.copy()
            for k, v in namespaces.items():
                self.vocab[k] = v
                self.rvocab[v] = k
github common-workflow-language / schema_salad / schema_salad / ref_resolver.py View on Github external
else:
            self.foreign_properties = set()

        if cache is not None:
            self.cache = cache
        else:
            self.cache = {}

        if skip_schemas is not None:
            self.skip_schemas = skip_schemas
        else:
            self.skip_schemas = False

        if session is None:
            if "HOME" in os.environ:
                self.session = CacheControl(
                    requests.Session(),
                    cache=FileCache(
                        os.path.join(os.environ["HOME"], ".cache", "salad")
                    ),
                )
            elif "TMP" in os.environ:
                self.session = CacheControl(
                    requests.Session(),
                    cache=FileCache(os.path.join(os.environ["TMP"], ".cache", "salad")),
                )
            else:
                self.session = CacheControl(
                    requests.Session(),
                    cache=FileCache(os.path.join("/tmp", ".cache", "salad")),
                )
        else: