Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@API.timeout
async def dummy(self):
await asyncio.sleep(0.005)
return True
@API.aiocache_enabled()
async def dummy(*args, **kwargs):
return True
@API.plugins
async def dummy(self, *args, **kwargs):
return True
def test_register(self):
@API.register
def dummy():
pass
assert dummy in API.CMDS
API.unregister(dummy)
class BasePlugin:
@classmethod
def add_hook(cls, func, hooks):
for hook in hooks:
setattr(cls, hook, func)
async def do_nothing(self, *args, **kwargs):
pass
BasePlugin.add_hook(
BasePlugin.do_nothing, ["pre_{}".format(method.__name__) for method in API.CMDS]
)
BasePlugin.add_hook(
BasePlugin.do_nothing, ["post_{}".format(method.__name__) for method in API.CMDS]
)
class TimingPlugin(BasePlugin):
"""
Calculates average, min and max times each command takes. The data is saved
in the cache class as a dict attribute called ``profiling``. For example, to
access the average time of the operation get, you can do ``cache.profiling['get_avg']``
"""
@classmethod
def save_time(cls, method):
async def do_save_time(self, client, *args, took=0, **kwargs):
if not hasattr(client, "profiling"):
client.profiling = {}
from aiocache.base import API
class BasePlugin:
@classmethod
def add_hook(cls, func, hooks):
for hook in hooks:
setattr(cls, hook, func)
async def do_nothing(self, *args, **kwargs):
pass
BasePlugin.add_hook(
BasePlugin.do_nothing, ["pre_{}".format(method.__name__) for method in API.CMDS]
)
BasePlugin.add_hook(
BasePlugin.do_nothing, ["post_{}".format(method.__name__) for method in API.CMDS]
)
class TimingPlugin(BasePlugin):
"""
Calculates average, min and max times each command takes. The data is saved
in the cache class as a dict attribute called ``profiling``. For example, to
access the average time of the operation get, you can do ``cache.profiling['get_avg']``
"""
@classmethod
def save_time(cls, method):
async def do_save_time(self, client, *args, took=0, **kwargs):
@API.register
@API.aiocache_enabled(fake_return=1)
@API.timeout
@API.plugins
async def increment(self, key, delta=1, namespace=None, _conn=None):
"""
Increments value stored in key by delta (can be negative). If key doesn't
exist, it creates the key with delta as value.
:param key: str key to check
:param delta: int amount to increment/decrement
:param namespace: str alternative namespace to use
:param timeout: int or float in seconds specifying maximum timeout
for the operations to last
:returns: Value of the key once incremented. -1 if key is not found.
:raises: :class:`asyncio.TimeoutError` if it lasts more than self.timeout
:raises: :class:`TypeError` if value is not incrementable
@API.aiocache_enabled(fake_return=[])
@API.timeout
@API.plugins
async def multi_get(self, keys, loads_fn=None, namespace=None, _conn=None):
"""
Get multiple values from the cache, values not found are Nones.
:param keys: list of str
:param loads_fn: callable alternative to use as loads function
:param namespace: str alternative namespace to use
:param timeout: int or float in seconds specifying maximum timeout
for the operations to last
:returns: list of objs
:raises: :class:`asyncio.TimeoutError` if it lasts more than self.timeout
"""
start = time.monotonic()
loads = loads_fn or self._serializer.loads
def register(cls, func):
API.CMDS.add(func)
return func
@API.timeout
@API.plugins
async def raw(self, command, *args, _conn=None, **kwargs):
"""
Send the raw command to the underlying client. Note that by using this CMD you
will lose compatibility with other backends.
Due to limitations with aiomcache client, args have to be provided as bytes.
For rest of backends, str.
:param command: str with the command.
:param timeout: int or float in seconds specifying maximum timeout
for the operations to last
:returns: whatever the underlying client returns
:raises: :class:`asyncio.TimeoutError` if it lasts more than self.timeout
"""
start = time.monotonic()