Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_cache_types(self):
assert Cache.MEMORY == SimpleMemoryCache
assert Cache.REDIS == RedisCache
assert Cache.MEMCACHED == MemcachedCache
import asyncio
from collections import namedtuple
from aiocache import Cache
from aiocache.serializers import PickleSerializer
MyObject = namedtuple("MyObject", ["x", "y"])
cache = Cache(Cache.REDIS, serializer=PickleSerializer(), namespace="main")
async def complex_object():
obj = MyObject(x=1, y=2)
await cache.set("key", obj)
my_object = await cache.get("key")
assert my_object.x == 1
assert my_object.y == 2
def test_python_object():
loop = asyncio.get_event_loop()
loop.run_until_complete(complex_object())
loop.run_until_complete(cache.delete("key"))
loop.run_until_complete(cache.close())
DEFAULT_ENCODING = None
def dumps(self, value):
print("I've received:\n{}".format(value))
compressed = zlib.compress(value.encode())
print("But I'm storing:\n{}".format(compressed))
return compressed
def loads(self, value):
print("I've retrieved:\n{}".format(value))
decompressed = zlib.decompress(value).decode()
print("But I'm returning:\n{}".format(decompressed))
return decompressed
cache = Cache(Cache.REDIS, serializer=CompressionSerializer(), namespace="main")
async def serializer():
text = (
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt"
"ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation"
"ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in"
"reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur"
"sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit"
"anim id est laborum.")
await cache.set("key", text)
print("-----------------------------------")
real_value = await cache.get("key")
compressed_value = await cache.raw("get", "main:key")
assert len(compressed_value) < len(real_value.encode())
y = fields.Number()
@post_load
def build_object(self, data, **kwargs):
return MyType(data['x'], data['y'])
def dumps(value):
return MyTypeSchema().dumps(value)
def loads(value):
return MyTypeSchema().loads(value)
cache = Cache(Cache.REDIS, namespace="main")
async def serializer_function():
await cache.set("key", MyType(1, 2), dumps_fn=dumps)
obj = await cache.get("key", loads_fn=loads)
assert obj.x == 1
assert obj.y == 2
assert await cache.get("key") == json.loads(('{"y": 2.0, "x": 1.0}'))
assert json.loads(await cache.raw("get", "main:key")) == {"y": 2.0, "x": 1.0}
def test_serializer_function():
loop = asyncio.get_event_loop()
loop.run_until_complete(serializer_function())
def test_alias():
loop = asyncio.get_event_loop()
loop.run_until_complete(default_cache())
loop.run_until_complete(alt_cache())
cache = Cache(Cache.REDIS)
loop.run_until_complete(cache.delete("key"))
loop.run_until_complete(cache.close())
loop.run_until_complete(caches.get('default').close())
import asyncio
from aiocache import Cache
cache = Cache(Cache.REDIS, endpoint="127.0.0.1", port=6379, namespace="main")
async def redis():
await cache.set("key", "value")
await cache.set("expire_me", "value", ttl=10)
assert await cache.get("key") == "value"
assert await cache.get("expire_me") == "value"
assert await cache.raw("ttl", "main:expire_me") > 0
def test_redis():
loop = asyncio.get_event_loop()
loop.run_until_complete(redis())
loop.run_until_complete(cache.delete("key"))
loop.run_until_complete(cache.delete("expire_me"))
import asyncio
import logging
import random
from aiocache import Cache
from aiocache.lock import OptimisticLock, OptimisticLockError
logger = logging.getLogger(__name__)
cache = Cache(Cache.REDIS, endpoint='127.0.0.1', port=6379, namespace='main')
async def expensive_function():
logger.warning('Expensive is being executed...')
await asyncio.sleep(random.uniform(0, 2))
return 'result'
async def my_view():
async with OptimisticLock(cache, 'key') as lock:
result = await expensive_function()
try:
await lock.cas(result)
except OptimisticLockError:
logger.warning(
'c': "X",
'd': "W"
}
@multi_cached("ids", cache=Cache.REDIS, namespace="main")
async def multi_cached_ids(ids=None):
return {id_: DICT[id_] for id_ in ids}
@multi_cached("keys", cache=Cache.REDIS, namespace="main")
async def multi_cached_keys(keys=None):
return {id_: DICT[id_] for id_ in keys}
cache = Cache(Cache.REDIS, endpoint="127.0.0.1", port=6379, namespace="main")
def test_multi_cached():
loop = asyncio.get_event_loop()
loop.run_until_complete(multi_cached_ids(ids=['a', 'b']))
loop.run_until_complete(multi_cached_ids(ids=['a', 'c']))
loop.run_until_complete(multi_cached_keys(keys=['d']))
assert loop.run_until_complete(cache.exists('a'))
assert loop.run_until_complete(cache.exists('b'))
assert loop.run_until_complete(cache.exists('c'))
assert loop.run_until_complete(cache.exists('d'))
loop.run_until_complete(cache.delete("a"))
loop.run_until_complete(cache.delete("b"))
loop.run_until_complete(cache.delete("c"))
def test_cached():
cache = Cache(Cache.REDIS, endpoint="127.0.0.1", port=6379, namespace="main")
loop = asyncio.get_event_loop()
loop.run_until_complete(cached_call())
assert loop.run_until_complete(cache.exists("key")) is True
loop.run_until_complete(cache.delete("key"))
loop.run_until_complete(cache.close())
async def alt_cache():
# This generates a new instance every time! You can also use `caches.create('alt')`
# or even `caches.create('alt', namespace="test", etc...)` to override extra args
cache = caches.create(**caches.get_alias_config('redis_alt'))
await cache.set("key", "value")
assert await cache.get("key") == "value"
assert isinstance(cache, Cache.REDIS)
assert isinstance(cache.serializer, PickleSerializer)
assert len(cache.plugins) == 2
assert cache.endpoint == "127.0.0.1"
assert cache.timeout == 1
assert cache.port == 6379
await cache.close()