How to use cachier - 10 common examples

To help you get started, we’ve selected a few cachier examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github shaypal5 / cachier / tests / test_pickle_core.py View on Github external
@cachier(next_time=False)
def _takes_5_seconds(arg_1, arg_2):
    """Some function."""
    sleep(5)
    return 'arg_1:{}, arg_2:{}'.format(arg_1, arg_2)
github shaypal5 / cachier / tests / test_mongo_core.py View on Github external
@cachier(mongetter=_bad_mongetter)
def _func_w_bad_mongo(arg_1, arg_2):
    """Some function."""
    return random() + arg_1 + arg_2
github shaypal5 / cachier / tests / test_pickle_core.py View on Github external
@cachier(stale_after=timedelta(seconds=1), next_time=True)
def _error_throwing_func(arg1):
    if not hasattr(_error_throwing_func, 'count'):
        _error_throwing_func.count = 0
    _error_throwing_func.count += 1
    if _error_throwing_func.count > 1:
        raise ValueError("Tiny Rick!")
    return 7
github shaypal5 / cachier / tests / test_pickle_core.py View on Github external
@cachier(stale_after=DELTA, next_time=True)
def _stale_after_next_time(arg_1, arg_2):
    """Some function."""
    return random()
github shaypal5 / cachier / tests / test_pickle_core.py View on Github external
@cachier(stale_after=DELTA, next_time=False)
def _stale_after_seconds(arg_1, arg_2):
    """Some function."""
    return random()
github shaypal5 / cachier / tests / test_pickle_core.py View on Github external
@cachier(stale_after=timedelta(seconds=1), next_time=True)
def _being_calc_next_time(arg_1, arg_2):
    """Some function."""
    sleep(1)
    return random() + arg_1 + arg_2
github shaypal5 / cachier / tests / test_mongo_core.py View on Github external
def test_stalled_mongo_db_cache():
    @cachier(mongetter=_test_mongetter)
    def _stalled_func():
        return 1
    core = _MongoCore(_test_mongetter, None, False)
    core.set_func(_stalled_func)
    core.clear_cache()
    with pytest.raises(RecalculationNeeded):
        core.wait_on_entry_calc(key=None)
github shaypal5 / cachier / tests / test_mongo_core.py View on Github external
def test_mongo_index_creation():
    """Basic Mongo core functionality."""
    collection = _test_mongetter()
    _test_mongo_caching.clear_cache()
    val1 = _test_mongo_caching(1, 2)
    val2 = _test_mongo_caching(1, 2)
    assert val1 == val2
    assert _MongoCore._INDEX_NAME in collection.index_information()
github shaypal5 / cachier / tests / test_pickle_core.py View on Github external
res1 = res_queue.get()
    assert isinstance(res1, float)
    res2 = res_queue.get()
    assert (res2 is None) or isinstance(res2, KeyError)


@cachier()
def _delete_cache(arg_1, arg_2):
    """Some function."""
    sleep(1)
    return random() + arg_1 + arg_2


# _DEL_CACHE_FNAME = '.__main__._delete_cache'
_DEL_CACHE_FNAME = '.tests.test_pickle_core._delete_cache'
_DEL_CACHE_FPATH = os.path.join(EXPANDED_CACHIER_DIR, _DEL_CACHE_FNAME)


def _calls_delete_cache(res_queue, del_cache):
    try:
        # print('in')
        res = _delete_cache(0.13, 0.02)
        # print('out with {}'.format(res))
        if del_cache:
            # print('deleteing!')
            os.remove(_DEL_CACHE_FPATH)
            # print(os.path.isfile(_DEL_CACHE_FPATH))
        res_queue.put(res)
    except Exception as exc:
        # print('found')
        res_queue.put(exc)
github shaypal5 / cachier / tests / test_pickle_core.py View on Github external
for sleeptime in sleeptimes:
        if _helper_bad_cache_file(sleeptime):
            return
    assert False


@cachier()
def _delete_cache(arg_1, arg_2):
    """Some function."""
    sleep(1)
    return random() + arg_1 + arg_2


# _DEL_CACHE_FNAME = '.__main__._delete_cache'
_DEL_CACHE_FNAME = '.tests.test_pickle_core._delete_cache'
_DEL_CACHE_FPATH = os.path.join(EXPANDED_CACHIER_DIR, _DEL_CACHE_FNAME)


def _calls_delete_cache(res_queue, del_cache):
    try:
        # print('in')
        res = _delete_cache(0.13, 0.02)
        # print('out with {}'.format(res))
        if del_cache:
            # print('deleteing!')
            os.remove(_DEL_CACHE_FPATH)
            # print(os.path.isfile(_DEL_CACHE_FPATH))
        res_queue.put(res)
    except Exception as exc:
        # print('found')
        res_queue.put(exc)