How to use the diskcache.core.Disk function in diskcache

To help you get started, we’ve selected a few diskcache examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github grantjenks / python-diskcache / diskcache / ordered.py View on Github external
    def __init__(self, directory, timeout=60, disk=Disk(),
                 **settings):
        """Initialize OrderedCache instance.

        :param str directory: cache directory
        :param float timeout: SQLite connection timeout
        :param disk: `Disk`: instance for serialization
        :param settings: any of `DEFAULT_SETTINGS`

        """
        super(self.__class__, self).__init__(directory, timeout, disk)
github grantjenks / python-diskcache / diskcache / core.py View on Github external
def __init__(self, directory=None, timeout=60, disk=Disk, **settings):
        """Initialize cache instance.

        :param str directory: cache directory
        :param float timeout: SQLite connection timeout
        :param disk: Disk type or subclass for serialization
        :param settings: any of DEFAULT_SETTINGS

        """
        try:
            assert issubclass(disk, Disk)
        except (TypeError, AssertionError):
            raise ValueError('disk must subclass diskcache.Disk')

        if directory is None:
            directory = tempfile.mkdtemp(prefix='diskcache-')
        directory = op.expanduser(directory)
        directory = op.expandvars(directory)

        self._directory = directory
        self._timeout = 0  # Manually handle retries during initialization.
        self._local = threading.local()
        self._txn_id = None

        if not op.isdir(directory):
            try:
                os.makedirs(directory, 0o755)
github grantjenks / python-diskcache / diskcache / core.py View on Github external
    def __init__(self, directory=None, timeout=60, disk=Disk, **settings):
        """Initialize cache instance.

        :param str directory: cache directory
        :param float timeout: SQLite connection timeout
        :param disk: Disk type or subclass for serialization
        :param settings: any of DEFAULT_SETTINGS

        """
        try:
            assert issubclass(disk, Disk)
        except (TypeError, AssertionError):
            raise ValueError('disk must subclass diskcache.Disk')

        if directory is None:
            directory = tempfile.mkdtemp(prefix='diskcache-')
        directory = op.expanduser(directory)
github grantjenks / python-diskcache / diskcache / core.py View on Github external
"""
        full_path = op.join(self._directory, filename)

        try:
            os.remove(full_path)
        except WindowsError:
            pass
        except OSError as error:
            if error.errno != errno.ENOENT:
                # ENOENT may occur if two caches attempt to delete the same
                # file at the same time.
                raise


class JSONDisk(Disk):
    "Cache key and value using JSON serialization with zlib compression."
    def __init__(self, directory, compress_level=1, **kwargs):
        """Initialize JSON disk instance.

        Keys and values are compressed using the zlib library. The
        `compress_level` is an integer from 0 to 9 controlling the level of
        compression; 1 is fastest and produces the least compression, 9 is
        slowest and produces the most compression, and 0 is no compression.

        :param str directory: directory path
        :param int compress_level: zlib compression level (default 1)
        :param kwargs: super class arguments

        """
        self.compress_level = compress_level
        super(JSONDisk, self).__init__(directory, **kwargs)