Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _drop_closed(self):
diff = len(self._pool)
super()._drop_closed()
diff -= len(self._pool)
if diff:
# closed connections were in pool:
# * reset address;
# * notify sentinel pool
sentinel_logger.debug(
"Dropped %d closed connnection(s); must rediscover", diff)
self._sentinel._need_rediscover(self._service)
async def _connect_sentinel(self, address, timeout, pools):
"""Try to connect to specified Sentinel returning either
connections pool or exception.
"""
try:
with async_timeout(timeout):
pool = await create_pool(
address, minsize=1, maxsize=2,
parser=self._parser_class,
)
pools.append(pool)
return pool
except asyncio.TimeoutError as err:
sentinel_logger.debug(
"Failed to connect to Sentinel(%r) within %ss timeout",
address, timeout)
return err
except Exception as err:
sentinel_logger.debug(
"Error connecting to Sentinel(%r): %r", address, err)
return err
async def _create_new_connection(self, address):
if address is _NON_DISCOVERED:
# Perform service discovery.
# Returns Connection or raises error if no service can be found.
await self._do_clear() # make `clear` blocking
if self._is_master:
conn = await self._sentinel.discover_master(
self._service, timeout=self._sentinel.discover_timeout)
else:
conn = await self._sentinel.discover_slave(
self._service, timeout=self._sentinel.discover_timeout)
self._address = conn.address
sentinel_logger.debug("Discoverred new address %r for %s",
conn.address, self._service)
return conn
return await super()._create_new_connection(address)
"""
try:
with async_timeout(timeout):
pool = await create_pool(
address, minsize=1, maxsize=2,
parser=self._parser_class,
)
pools.append(pool)
return pool
except asyncio.TimeoutError as err:
sentinel_logger.debug(
"Failed to connect to Sentinel(%r) within %ss timeout",
address, timeout)
return err
except Exception as err:
sentinel_logger.debug(
"Error connecting to Sentinel(%r): %r", address, err)
return err
def release(self, conn):
was_closed = conn.closed
super().release(conn)
# if connection was closed while used and not by release()
if was_closed:
sentinel_logger.debug(
"Released closed connection; must rediscover")
self._sentinel._need_rediscover(self._service)
def _need_rediscover(self, service):
sentinel_logger.debug("Must redisover service %s", service)
pool = self._masters.get(service)
if pool:
pool.need_rediscover()
pool = self._slaves.get(service)
if pool:
pool.need_rediscover()
contextlib.ExitStack() as stack:
conn = await pool._create_new_connection(address)
stack.callback(conn.close)
await self._verify_service_role(conn, 'master')
stack.pop_all()
return conn
except asyncio.CancelledError:
# we must correctly handle CancelledError(s):
# application may be stopped or function can be cancelled
# by outer timeout, so we must stop the look up.
raise
except asyncio.TimeoutError:
continue
except DiscoverError as err:
sentinel_logger.debug("DiscoverError(%r, %s): %r",
sentinel, service, err)
await asyncio.sleep(idle_timeout)
continue
except RedisError as err:
raise MasterReplyError("Service {} error".format(service), err)
except Exception:
# TODO: clear (drop) connections to schedule reconnect
await asyncio.sleep(idle_timeout)
continue
# Otherwise
raise MasterNotFoundError("No master found for {}".format(service))