Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_cache_with_default_plugins_kwargs(self):
settings.set_cache(
"aiocache.RedisCache", endpoint="http://...", port=6379)
cache = get_cache(
namespace="default", serializer=PickleSerializer(),
plugins=BasePlugin(), port=123)
assert isinstance(cache, RedisCache)
assert cache.endpoint == "http://..."
assert cache.port == 123
assert cache.namespace == "default"
assert isinstance(cache.serializer, PickleSerializer)
assert isinstance(cache.plugins, BasePlugin)
def test_get_cache_overrides(self):
cache = get_cache(
cache=RedisCache, namespace="default", serializer=PickleSerializer(),
plugins=BasePlugin(), endpoint="http://...", port=123)
assert isinstance(cache, RedisCache)
assert cache.endpoint == "http://..."
assert cache.port == 123
assert cache.namespace == "default"
assert isinstance(cache.serializer, PickleSerializer)
assert isinstance(cache.plugins, BasePlugin)
async def test_interface_methods(self):
for method in API.CMDS:
assert await getattr(BasePlugin, "pre_{}".format(method.__name__))(MagicMock()) is None
assert await getattr(BasePlugin, "post_{}".format(method.__name__))(MagicMock()) is None
def test_get_cache_with_default_config(self):
settings.set_cache(
"aiocache.RedisCache", endpoint="http://...", port=6379)
cache = get_cache(
namespace="default", serializer=PickleSerializer(),
plugins=BasePlugin(), port=123)
assert isinstance(cache, RedisCache)
assert cache.endpoint == "http://..."
assert cache.port == 123
assert cache.namespace == "default"
assert isinstance(cache.serializer, PickleSerializer)
assert isinstance(cache.plugins, BasePlugin)
def test_get_cache_overrides(self):
cache = get_cache(
cache=RedisCache, namespace="default", serializer=PickleSerializer(),
plugins=BasePlugin(), endpoint="http://...", port=123)
assert isinstance(cache, RedisCache)
assert cache.endpoint == "http://..."
assert cache.port == 123
assert cache.namespace == "default"
assert isinstance(cache.serializer, PickleSerializer)
assert isinstance(cache.plugins, BasePlugin)
for hook in hooks:
setattr(cls, hook, func)
async def do_nothing(self, *args, **kwargs):
pass
BasePlugin.add_hook(
BasePlugin.do_nothing, ["pre_{}".format(method.__name__) for method in API.CMDS]
)
BasePlugin.add_hook(
BasePlugin.do_nothing, ["post_{}".format(method.__name__) for method in API.CMDS]
)
class TimingPlugin(BasePlugin):
"""
Calculates average, min and max times each command takes. The data is saved
in the cache class as a dict attribute called ``profiling``. For example, to
access the average time of the operation get, you can do ``cache.profiling['get_avg']``
"""
@classmethod
def save_time(cls, method):
async def do_save_time(self, client, *args, took=0, **kwargs):
if not hasattr(client, "profiling"):
client.profiling = {}
previous_total = client.profiling.get("{}_total".format(method), 0)
previous_avg = client.profiling.get("{}_avg".format(method), 0)
previous_max = client.profiling.get("{}_max".format(method), 0)
previous_min = client.profiling.get("{}_min".format(method))
import asyncio
import random
import logging
from aiocache import Cache
from aiocache.plugins import HitMissRatioPlugin, TimingPlugin, BasePlugin
logger = logging.getLogger(__name__)
class MyCustomPlugin(BasePlugin):
async def pre_set(self, *args, **kwargs):
logger.info("I'm the pre_set hook being called with %s %s" % (args, kwargs))
async def post_set(self, *args, **kwargs):
logger.info("I'm the post_set hook being called with %s %s" % (args, kwargs))
cache = Cache(
plugins=[HitMissRatioPlugin(), TimingPlugin(), MyCustomPlugin()],
namespace="main")
async def run():
await cache.set("a", "1")
await cache.set("b", "2")
"""
from aiocache.base import API
class BasePlugin:
@classmethod
def add_hook(cls, func, hooks):
for hook in hooks:
setattr(cls, hook, func)
async def do_nothing(self, *args, **kwargs):
pass
BasePlugin.add_hook(
BasePlugin.do_nothing, ["pre_{}".format(method.__name__) for method in API.CMDS]
)
BasePlugin.add_hook(
BasePlugin.do_nothing, ["post_{}".format(method.__name__) for method in API.CMDS]
)
class TimingPlugin(BasePlugin):
"""
Calculates average, min and max times each command takes. The data is saved
in the cache class as a dict attribute called ``profiling``. For example, to
access the average time of the operation get, you can do ``cache.profiling['get_avg']``
"""
@classmethod
def save_time(cls, method):
import asyncio
from collections import deque
from aiocache import RedisCache
from aiocache.plugins import BasePlugin
class LRUPlugin(BasePlugin):
"""
Implements a Least Recently Used policy with max_keys. The policy does the following:
- When a key is retrieved get, keys are moved to the beginning of the queue
- When a key is added (set), keys are added to the beginning of the queue. If
the queue is full, it will remove as many keys as needed to make space for the new
ones.
IMPORTANT!
- The queue is implemented using a Python deque so it is NOT persistent!
- Careful when working on distributed systems, you may run into incosistencies if this
policy is run from different instances that point to the same endpoint and namespace.
- To have a full LRU, you should also implement the add, multi_set and multi_get methods.
"""
def __init__(self, max_keys=None):
super().__init__()
if max_keys is not None:
assert max_keys >= 1, "Number of keys must be 1 or bigger"