Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_storage(loop):
config = MergeDict(
name='1',
prefix=str(uuid.uuid4()),
format='json',
)
config['app.redis_pool'] = await aioredis.create_pool(
('localhost', 6379), loop=loop)
context = config
q = RedisStorage(config, context=context, loop=loop)
await q.init()
await q.set('g', {'f': 3})
assert {'f': 3} == await q.get('g')
assert 1 == await q.length()
assert ['g'] == await q.list()
await q.set('g', None)
assert not await q.length()
def aioredis_pool(event_loop):
return event_loop.run_until_complete(aioredis.create_pool(("127.0.0.1", 6379), maxsize=1))
async def test_redis_from_create_pool(redis_params):
async def handler(request):
pass
redis = await aioredis.create_pool(**redis_params)
with pytest.warns(DeprecationWarning):
create_app(handler=handler, redis=redis)
settings = settings or RedisSettings()
assert not (
type(settings.host) is str and settings.sentinel
), "str provided for 'host' but 'sentinel' is true; list of sentinels expected"
if settings.sentinel:
addr = settings.host
async def pool_factory(*args, **kwargs):
client = await aioredis.sentinel.create_sentinel_pool(*args, ssl=settings.ssl, **kwargs)
return client.master_for(settings.sentinel_master)
else:
pool_factory = functools.partial(
aioredis.create_pool, create_connection_timeout=settings.conn_timeout, ssl=settings.ssl
)
addr = settings.host, settings.port
try:
pool = await pool_factory(addr, db=settings.database, password=settings.password, encoding='utf8')
pool = ArqRedis(pool, job_serializer=job_serializer, job_deserializer=job_deserializer)
except (ConnectionError, OSError, aioredis.RedisError, asyncio.TimeoutError) as e:
if retry < settings.conn_retries:
logger.warning(
'redis connection error %s %s %s, %d retries remaining...',
addr,
e.__class__.__name__,
e,
settings.conn_retries - retry,
)
def __init__(self, server, loop):
self.pool = aioredis.create_pool(
server,
minsize=5, maxsize=15,
loop=loop
)
*,
loop: asyncio.BaseEventLoop = None,
expiration: int = 300,
key_pattern: Tuple[AvailableKeys] = DEFAULT_KEY_PATTERN,
encrypt_key=True,
):
"""
:param loop:
:type loop:
"""
BaseCache.__init__(self, config.expiration)
_loop = loop or asyncio.get_event_loop()
self._redis_pool = _loop.run_until_complete(
aioredis.create_pool(
(config.host, config.port), db=config.db, password=config.password
)
)
self.key_prefix = config.key_prefix
super().__init__(
expiration=expiration, key_pattern=key_pattern, encrypt_key=encrypt_key
)
@asyncio.coroutine
def pubsub():
pool = yield from aioredis.create_pool(
('localhost', 6379),
minsize=5, maxsize=10)
@asyncio.coroutine
def reader(channel):
while (yield from channel.wait_message()):
msg = yield from channel.get(encoding='utf-8')
# ... process message ...
print("message in {}: {}".format(channel.name, msg))
if msg == STOPWORD:
return
with (yield from pool) as conn:
raw_result = yield from conn.execute_pubsub('subscribe', 'channel:1')
print('raw result:', raw_result)
def go():
pool = yield from aioredis.create_pool(
('localhost', 6379),
minsize=5, maxsize=10)
with (yield from pool) as conn: # low-level redis connection
yield from conn.execute('set', 'my-key', 'value')
val = yield from conn.execute('get', 'my-key')
print('raw value:', val)
pool.close()
yield from pool.wait_closed() # closing all open connections
async def connect(self):
"""Connect to the database.
This method will connect to a Redis database. By default it will
connect to Redis on localhost on port 6379
"""
try:
self.client = await aioredis.create_pool(
address=(self.host, int(self.port)),
db=self.database,
password=self.password,
parser=parser.PyReader,
)
_LOGGER.info(
_("Connected to Redis database %s from %s on port %s."),
self.database,
self.host,
self.port,
)
except OSError:
_LOGGER.warning(
_("Unable to connect to Redis database on address: %s port: %s."),
self.host,