Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def ping(self):
if self.error == "connection":
raise aioredis.ConnectionClosedError("fake")
elif self.error == "redis":
raise aioredis.RedisError("fake")
elif self.error == "malformed":
return b"PING"
else:
return b"PONG"
async def prepare(self):
"""
Called by the backend to prepare SocketShark (i.e. initialize Redis
connection and the receiver class)
"""
redis_receiver = Receiver(loop=asyncio.get_event_loop())
redis_settings = self.config['REDIS']
try:
self.redis = await aioredis.create_redis((
redis_settings['host'], redis_settings['port']))
except (OSError, aioredis.RedisError):
self.log.exception('could not connect to redis')
raise
# Some features (e.g. pinging) don't work on old Redis versions.
info = await self.redis.info('server')
version_info = info['server']['redis_version'].split('.')
major, minor = int(version_info[0]), int(version_info[1])
if not (major > 3 or major == 3 and minor >= 2):
msg = 'Redis version must be at least 3.2'
self.log.exception(msg, version_info=version_info)
raise Exception(msg)
self._redis_connection_handler_task = asyncio.ensure_future(
self._redis_connection_handler())
self.service_receiver = ServiceReceiver(self, redis_receiver)
def _wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except RedisError as e:
raise KeyValueStoreError(e)
async def pool_factory(*args, **kwargs):
client = await aioredis.sentinel.create_sentinel_pool(*args, ssl=settings.ssl, **kwargs)
return client.master_for(settings.sentinel_master)
else:
pool_factory = functools.partial(
aioredis.create_pool, create_connection_timeout=settings.conn_timeout, ssl=settings.ssl
)
addr = settings.host, settings.port
try:
pool = await pool_factory(addr, db=settings.database, password=settings.password, encoding='utf8')
pool = ArqRedis(pool, job_serializer=job_serializer, job_deserializer=job_deserializer)
except (ConnectionError, OSError, aioredis.RedisError, asyncio.TimeoutError) as e:
if retry < settings.conn_retries:
logger.warning(
'redis connection error %s %s %s, %d retries remaining...',
addr,
e.__class__.__name__,
e,
settings.conn_retries - retry,
)
await asyncio.sleep(settings.conn_retry_delay)
else:
raise
else:
if retry > 0:
logger.info('redis connection successful')
return pool
async def push_stats(self):
"""Push current stats to Redis."""
snapshot = self._make_stats()
try:
serialized = json.dumps(snapshot)
await self._call_redis(
aioredis.Redis.set, state.MANAGER_LISTENER_STATS, serialized
)
await self._call_redis(
aioredis.Redis.expire, state.MANAGER_LISTENER_STATS, 3600
)
except TypeError:
logger.error(
__("Listener can't serialize statistics:\n\n{}", traceback.format_exc())
)
except aioredis.RedisError:
logger.error(
__(
"Listener can't store updated statistics:\n\n{}",
traceback.format_exc(),
)
app.config['REDIS'] = {'address': 'redis://:password@localhost:6379/0'}
redis = SanicRedis(app)
dockerflow = Dockerflow(app, redis=redis)
"""
import aioredis
errors = []
try:
with await redis.conn as r:
result = await r.ping()
except aioredis.ConnectionClosedError as e:
msg = "Could not connect to redis: {!s}".format(e)
errors.append(Error(msg, id=health.ERROR_CANNOT_CONNECT_REDIS))
except aioredis.RedisError as e:
errors.append(
Error('Redis error: "{!s}"'.format(e), id=health.ERROR_REDIS_EXCEPTION)
)
else:
if result != b"PONG":
errors.append(Error("Redis ping failed", id=health.ERROR_REDIS_PING_FAILED))
return errors
async def wrapper(*args, **kwargs):
try:
return await coro(*args, **kwargs)
except RedisError as exc:
# Any connection and protocol related issues but also invalid
# Redis-command formatting (not likely).
log.exception(exc)
except SocketError:
log.error("Connection with Redis has been lost. Retrying...")
return wrapper