Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_constant():
import diskcache.core
assert repr(diskcache.core.ENOVAL) == 'ENOVAL'
def test_first_last(cache):
assert cache.first() == ENOVAL
assert cache.last() == ENOVAL
cache.check()
assert cache.first(default=1) == 1
assert cache.last(default=1) == 1
cache.check()
start = 0
end = 100
for value in range(start, end + 1):
cache.set(value, value)
cache.check()
def wrapper(*args, **kwargs):
"Wrapper function to cache function result."
key = (args, kwargs)
try:
result, expire_time, delta = cache.get(
key, default=ENOVAL, expire_time=True, tag=True
)
if result is ENOVAL:
raise KeyError
now = time.time()
ttl = expire_time - now
if (-delta * math.log(random.random())) < ttl:
return result
except KeyError:
pass
now = time.time()
result = func(*args, **kwargs)
def wrapper(*args, **kwargs):
"Wrapper for callable to cache arguments and return values."
key = wrapper.__cache_key__(*args, **kwargs)
pair, expire_time = cache.get(
key, default=ENOVAL, expire_time=True, retry=True,
)
if pair is not ENOVAL:
result, delta = pair
now = time.time()
ttl = expire_time - now
if (-delta * beta * math.log(random.random())) < ttl:
return result # Cache hit.
# Check whether a thread has started for early recomputation.
thread_key = key + (ENOVAL,)
thread_added = cache.add(
thread_key, None, expire=delta, retry=True,
)
def read(self, key):
"""Return file handle corresponding to `key` from cache.
:param key: key for item
:return: file open for reading in binary mode
:raises KeyError: if key is not found
"""
handle = self.get(key, default=ENOVAL, read=True, retry=True)
if handle is ENOVAL:
raise KeyError(key)
return handle
If deque is empty then raise IndexError.
>>> deque = Deque()
>>> deque.peekleft()
Traceback (most recent call last):
...
IndexError: peek from an empty deque
>>> deque += 'abc'
>>> deque.peekleft()
'a'
:return: value at front of deque
:raises IndexError: if deque is empty
"""
default = None, ENOVAL
_, value = self._cache.peek(default=default, side='front', retry=True)
if value is ENOVAL:
raise IndexError('peek from an empty deque')
return value
def reset(self, key, value=ENOVAL):
"""Reset `key` and `value` item from Settings table.
If `value` is not given, it is reloaded from the Settings
table. Otherwise, the Settings table is updated.
Settings attributes on cache objects are lazy-loaded and
read-only. Use `reset` to update the value.
Settings with the ``sqlite_`` prefix correspond to SQLite
pragmas. Updating the value will execute the corresponding PRAGMA
statement.
:param str key: Settings key for item
:param value: value for item (optional)
:return: updated value for item
def wrapper(*args, **kwargs):
"Wrapper function to cache function result."
key = (args, kwargs)
try:
result, expire_time, delta = cache.get(
key, default=ENOVAL, expire_time=True, tag=True
)
if result is ENOVAL:
raise KeyError
now = time.time()
ttl = expire_time - now
if (-delta * math.log(random.random())) < ttl:
return result
except KeyError:
pass
now = time.time()
result = func(*args, **kwargs)
delta = time.time() - now
cache.set(key, result, expire=expire, tag=delta)
def wrapper(*args, **kwargs):
"Wrapper for callable to cache arguments and return values."
key = wrapper.__cache_key__(*args, **kwargs)
result = self.get(key, ENOVAL, version, retry=True)
if result is ENOVAL:
result = func(*args, **kwargs)
valid_timeout = (
timeout is None
or timeout == DEFAULT_TIMEOUT
or timeout > 0
)
if valid_timeout:
self.set(
key, result, timeout, version, tag=tag, retry=True,
)
return result
key = wrapper.__cache_key__(*args, **kwargs)
pair, expire_time = cache.get(
key, default=ENOVAL, expire_time=True, retry=True,
)
if pair is not ENOVAL:
result, delta = pair
now = time.time()
ttl = expire_time - now
if (-delta * beta * math.log(random.random())) < ttl:
return result # Cache hit.
# Check whether a thread has started for early recomputation.
thread_key = key + (ENOVAL,)
thread_added = cache.add(
thread_key, None, expire=delta, retry=True,
)
if thread_added:
# Start thread for early recomputation.
def recompute():
with cache:
pair = timer(*args, **kwargs)
cache.set(
key, pair, expire=expire, tag=tag, retry=True,
)
thread = threading.Thread(target=recompute)
thread.daemon = True
thread.start()