Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_stalled_mongo_db_cache():
@cachier(mongetter=_test_mongetter)
def _stalled_func():
return 1
core = _MongoCore(_test_mongetter, None, False)
core.set_func(_stalled_func)
core.clear_cache()
with pytest.raises(RecalculationNeeded):
core.wait_on_entry_calc(key=None)
if entry is not None: # pylint: disable=R0101
_print('Entry found.')
if entry.get('value', None) is not None:
_print('Cached result found.')
if stale_after:
now = datetime.datetime.now()
if now - entry['time'] > stale_after:
_print('But it is stale... :(')
if entry['being_calculated']:
if next_time:
_print('Returning stale.')
return entry['value'] # return stale val
_print('Already calc. Waiting on change.')
try:
return core.wait_on_entry_calc(key)
except RecalculationNeeded:
return _calc_entry(core, key, func, args, kwds)
if next_time:
_print('Async calc and return stale')
try:
core.mark_entry_being_calculated(key)
_get_executor().submit(
_function_thread, core, key, func,
args, kwds)
finally:
core.mark_entry_not_calculated(key)
return entry['value']
_print('Calling decorated function and waiting')
return _calc_entry(core, key, func, args, kwds)
_print('And it is fresh!')
return entry['value']
if entry['being_calculated']:
def wait_on_entry_calc(self, key):
while True:
time.sleep(MONGO_SLEEP_DURATION_IN_SEC)
key, entry = self.get_entry_by_key(key)
if entry is None:
raise RecalculationNeeded()
if entry is not None and not entry['being_calculated']:
return entry['value']