Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_inheritance(self):
assert isinstance(RedisCache(), BaseCache)
def test_str_timeout(self):
cache = BaseCache(timeout="1.5")
assert cache.timeout == 1.5
async def test_post_get(self, plugin):
client = MagicMock(spec=BaseCache)
await plugin.post_get(client, pytest.KEY)
assert client.hit_miss_ratio["hits"] == 0
assert client.hit_miss_ratio["total"] == 1
assert client.hit_miss_ratio["hit_ratio"] == 0
await plugin.post_get(client, pytest.KEY, ret="value")
assert client.hit_miss_ratio["hits"] == 1
assert client.hit_miss_ratio["total"] == 2
assert client.hit_miss_ratio["hit_ratio"] == 0.5
def test_str_ttl(self):
cache = BaseCache(ttl="1.5")
assert cache.ttl == 1.5
async def test_post_multi_get(self, plugin):
client = MagicMock(spec=BaseCache)
await plugin.post_multi_get(client, [pytest.KEY, pytest.KEY_1], ret=[None, None])
assert client.hit_miss_ratio["hits"] == 0
assert client.hit_miss_ratio["total"] == 2
assert client.hit_miss_ratio["hit_ratio"] == 0
await plugin.post_multi_get(client, [pytest.KEY, pytest.KEY_1], ret=["value", "random"])
assert client.hit_miss_ratio["hits"] == 2
assert client.hit_miss_ratio["total"] == 4
assert client.hit_miss_ratio["hit_ratio"] == 0.5
return 0
class SimpleMemoryCache2(SimpleMemoryBackend2, BaseCache):
def __init__(self, serializer=None, **kwargs):
super().__init__(**kwargs)
self.serializer = serializer or NullSerializer()
class SimpleMemoryCache3(SimpleMemoryBackend3, BaseCache):
def __init__(self, serializer=None, **kwargs):
super().__init__(**kwargs)
self.serializer = serializer or NullSerializer()
class SimpleMemoryCache4(SimpleMemoryBackend4, BaseCache):
def __init__(self, serializer=None, **kwargs):
super().__init__(**kwargs)
self.serializer = serializer or NullSerializer()
def test_inheritance(self):
assert isinstance(MemcachedCache(), BaseCache)
def test_inheritance(self):
assert isinstance(SimpleMemoryCache(), BaseCache)
value = await getattr(self.client, command)(*args, **kwargs)
if command in ["get", "multi_get"]:
if encoding is not None and value is not None:
return value.decode(encoding)
return value
async def _redlock_release(self, key, _):
# Not ideal, should check the value coincides first but this would introduce
# race conditions
return await self._delete(key)
async def _close(self, *args, _conn=None, **kwargs):
await self.client.close()
class MemcachedCache(MemcachedBackend, BaseCache):
"""
Memcached cache implementation with the following components as defaults:
- serializer: :class:`aiocache.serializers.JsonSerializer`
- plugins: []
Config options are:
:param serializer: obj derived from :class:`aiocache.serializers.BaseSerializer`.
:param plugins: list of :class:`aiocache.plugins.BasePlugin` derived classes.
:param namespace: string to use as default prefix for the key used in all operations of
the backend. Default is None
:param timeout: int or float in seconds specifying maximum timeout for the operations to last.
By default its 5.
:param endpoint: str with the endpoint to connect to. Default is 127.0.0.1.
:param port: int with the port to connect to. Default is 11211.
:param pool_size: int size for memcached connections pool. Default is 2.
SimpleMemoryBackend._cache.pop(key)
return 1
return 0
@classmethod
def __delete(cls, key):
if cls._cache.pop(key, None):
handle = cls._handlers.pop(key, None)
if handle:
handle.cancel()
return 1
return 0
class SimpleMemoryCache(SimpleMemoryBackend, BaseCache):
"""
Memory cache implementation with the following components as defaults:
- serializer: :class:`aiocache.serializers.JsonSerializer`
- plugins: None
Config options are:
:param serializer: obj derived from :class:`aiocache.serializers.BaseSerializer`.
:param plugins: list of :class:`aiocache.plugins.BasePlugin` derived classes.
:param namespace: string to use as default prefix for the key used in all operations of
the backend. Default is None.
:param timeout: int or float in seconds specifying maximum timeout for the operations to last.
By default its 5.
"""
NAME = "memory"