Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init(self):
serializer = PickleSerializer()
assert isinstance(serializer, BaseSerializer)
assert serializer.DEFAULT_ENCODING is None
assert serializer.encoding is None
async def test_multi_set_multi_get_types(self, cache, obj):
cache.serializer = PickleSerializer()
assert await cache.multi_set([(pytest.KEY, obj)]) is True
assert await cache.multi_get([pytest.KEY]) == [pickle.loads(pickle.dumps(obj))]
def test_dumps(self):
assert PickleSerializer().dumps("hi") == b"\x80\x03X\x02\x00\x00\x00hiq\x00."
def test_get_cache_with_default_config(self):
settings.set_cache(
"aiocache.RedisCache", endpoint="http://...", port=6379)
cache = get_cache(
namespace="default", serializer=PickleSerializer(),
plugins=BasePlugin(), port=123)
assert isinstance(cache, RedisCache)
assert cache.endpoint == "http://..."
assert cache.port == 123
assert cache.namespace == "default"
assert isinstance(cache.serializer, PickleSerializer)
assert isinstance(cache.plugins, BasePlugin)
def test_get_cache_with_default_config(self):
settings.set_cache(
"aiocache.RedisCache", endpoint="http://...", port=6379)
cache = get_cache(
namespace="default", serializer=PickleSerializer(),
plugins=BasePlugin(), port=123)
assert isinstance(cache, RedisCache)
assert cache.endpoint == "http://..."
assert cache.port == 123
assert cache.namespace == "default"
assert isinstance(cache.serializer, PickleSerializer)
assert isinstance(cache.plugins, BasePlugin)
ttl=10, cache=Cache.REDIS, key="key", serializer=PickleSerializer(),
port=6379, namespace="main")
async def cached_call():
return Result("content", 200)
@cached(ttl=1000, cache=RedisCache, key="rss_json", serializer=PickleSerializer(), port=6379, namespace="main")
async def get_rss():
print("第一次休眠1秒 体现出和缓存时间上的差别...")
await asyncio.sleep(1)
url = "http://blog.howie6879.cn/atom.xml"
feed = parse(url)
articles = feed['entries']
data = []
for article in articles:
data.append({"title": article["title_detail"]["value"], "link": article["link"]})
return data
import asyncio
from collections import namedtuple
from aiocache import cached, RedisCache
from aiocache.serializers import PickleSerializer
Result = namedtuple('Result', "content, status")
RedisCache.set_defaults(
namespace="main",
db=1,
pool_min_size=3,
serializer=PickleSerializer())
@cached(cache=RedisCache, ttl=10, key="key")
async def decorator():
return Result("content", 200)
async def global_cache():
cache = RedisCache()
obj = await cache.get("key")
assert obj.content == "content"
assert obj.status == 200
assert cache.db == 1
assert cache.pool_min_size == 3
@cached(ttl=259200, key_from_attr='novels_name', serializer=PickleSerializer(), namespace="novels_name")
async def start(novels_name):
"""
Start spider
:return:
"""
return await SoNovels.start(novels_name)
@cached(ttl=300, key_from_attr='url', serializer=PickleSerializer(), namespace="main")
async def cache_owllook_novels_content(url, netloc):
headers = {
'user-agent': await get_random_user_agent()
}
html = await target_fetch(headers=headers, url=url)
if not html:
html = get_html_by_requests(url=url, headers=headers)
if html:
soup = BeautifulSoup(html, 'html5lib')
selector = RULES[netloc].content_selector
if selector.get('id', None):
content = soup.find_all(id=selector['id'])
elif selector.get('class', None):
content = soup.find_all(class_=selector['class'])
else:
content = soup.find_all(selector.get('tag'))