Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@cachier(next_time=False)
def _takes_5_seconds(arg_1, arg_2):
"""Some function."""
sleep(5)
return 'arg_1:{}, arg_2:{}'.format(arg_1, arg_2)
@cachier(mongetter=_bad_mongetter)
def _func_w_bad_mongo(arg_1, arg_2):
"""Some function."""
return random() + arg_1 + arg_2
@cachier(stale_after=timedelta(seconds=1), next_time=True)
def _error_throwing_func(arg1):
if not hasattr(_error_throwing_func, 'count'):
_error_throwing_func.count = 0
_error_throwing_func.count += 1
if _error_throwing_func.count > 1:
raise ValueError("Tiny Rick!")
return 7
@cachier(stale_after=DELTA, next_time=True)
def _stale_after_next_time(arg_1, arg_2):
"""Some function."""
return random()
@cachier(stale_after=DELTA, next_time=False)
def _stale_after_seconds(arg_1, arg_2):
"""Some function."""
return random()
@cachier(stale_after=timedelta(seconds=1), next_time=True)
def _being_calc_next_time(arg_1, arg_2):
"""Some function."""
sleep(1)
return random() + arg_1 + arg_2
def test_stalled_mongo_db_cache():
@cachier(mongetter=_test_mongetter)
def _stalled_func():
return 1
core = _MongoCore(_test_mongetter, None, False)
core.set_func(_stalled_func)
core.clear_cache()
with pytest.raises(RecalculationNeeded):
core.wait_on_entry_calc(key=None)
def test_mongo_index_creation():
"""Basic Mongo core functionality."""
collection = _test_mongetter()
_test_mongo_caching.clear_cache()
val1 = _test_mongo_caching(1, 2)
val2 = _test_mongo_caching(1, 2)
assert val1 == val2
assert _MongoCore._INDEX_NAME in collection.index_information()
res1 = res_queue.get()
assert isinstance(res1, float)
res2 = res_queue.get()
assert (res2 is None) or isinstance(res2, KeyError)
@cachier()
def _delete_cache(arg_1, arg_2):
"""Some function."""
sleep(1)
return random() + arg_1 + arg_2
# _DEL_CACHE_FNAME = '.__main__._delete_cache'
_DEL_CACHE_FNAME = '.tests.test_pickle_core._delete_cache'
_DEL_CACHE_FPATH = os.path.join(EXPANDED_CACHIER_DIR, _DEL_CACHE_FNAME)
def _calls_delete_cache(res_queue, del_cache):
try:
# print('in')
res = _delete_cache(0.13, 0.02)
# print('out with {}'.format(res))
if del_cache:
# print('deleteing!')
os.remove(_DEL_CACHE_FPATH)
# print(os.path.isfile(_DEL_CACHE_FPATH))
res_queue.put(res)
except Exception as exc:
# print('found')
res_queue.put(exc)
for sleeptime in sleeptimes:
if _helper_bad_cache_file(sleeptime):
return
assert False
@cachier()
def _delete_cache(arg_1, arg_2):
"""Some function."""
sleep(1)
return random() + arg_1 + arg_2
# _DEL_CACHE_FNAME = '.__main__._delete_cache'
_DEL_CACHE_FNAME = '.tests.test_pickle_core._delete_cache'
_DEL_CACHE_FPATH = os.path.join(EXPANDED_CACHIER_DIR, _DEL_CACHE_FNAME)
def _calls_delete_cache(res_queue, del_cache):
try:
# print('in')
res = _delete_cache(0.13, 0.02)
# print('out with {}'.format(res))
if del_cache:
# print('deleteing!')
os.remove(_DEL_CACHE_FPATH)
# print(os.path.isfile(_DEL_CACHE_FPATH))
res_queue.put(res)
except Exception as exc:
# print('found')
res_queue.put(exc)