Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_stalled_mongo_db_cache():
@cachier(mongetter=_test_mongetter)
def _stalled_func():
return 1
core = _MongoCore(_test_mongetter, None, False)
core.set_func(_stalled_func)
core.clear_cache()
with pytest.raises(RecalculationNeeded):
core.wait_on_entry_calc(key=None)
pickle_reload (optional) : bool
If set to True, in-memory cache will be reloaded on each cache read,
enabling different threads to share cache. Should be set to False for
faster reads in single-thread programs. Defaults to True.
mongetter (optional) : callable
A callable that takes no arguments and returns a pymongo.Collection
object with writing permissions. If unset a local pickle cache is used
instead.
"""
# print('Inside the wrapper maker')
# print('mongetter={}'.format(mongetter))
# print('stale_after={}'.format(stale_after))
# print('next_time={}'.format(next_time))
if mongetter:
core = _MongoCore(mongetter, stale_after, next_time)
else:
core = _PickleCore( # pylint: disable=R0204
stale_after, next_time, pickle_reload)
def _cachier_decorator(func):
core.set_func(func)
@wraps(func)
def func_wrapper(*args, **kwds): # pylint: disable=C0111,R0911
# print('Inside general wrapper for {}.'.format(func.__name__))
ignore_cache = kwds.pop('ignore_cache', False)
overwrite_cache = kwds.pop('overwrite_cache', False)
verbose_cache = kwds.pop('verbose_cache', False)
_print = lambda x: None
if verbose_cache:
_print = print
def get_entry_by_key(self, key):
res = self.mongo_collection.find_one({
'func': _MongoCore._get_func_str(self.func),
'key': key
})
if res:
try:
entry = {
'value': pickle.loads(res['value']),
'time': res.get('time', None),
'stale': res.get('stale', False),
'being_calculated': res.get('being_calculated', False)
}
except KeyError:
entry = {
'value': None,
'time': res.get('time', None),
'stale': res.get('stale', False),
'being_calculated': res.get('being_calculated', False)