Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_new_connection(self, address):
return create_connection(address,
db=self._db,
password=self._password,
ssl=self._ssl,
encoding=self._encoding,
parser=self._parser_class,
timeout=self._create_connection_timeout,
connection_cls=self._connection_cls,
)
with (yield from self._lock):
if self._conn is None or self._conn.closed:
if self._sentinel_service.is_master:
service = self._sentinel_service
addr = yield from service.get_master_address()
conn = yield from create_connection(
addr, **self._conn_kwargs)
self._conn = conn
else:
service = self._sentinel_service
slaves = yield from service.get_slaves()
num = service.num_slaves()
for idx in range(num):
slave = slaves[idx]
try:
conn = yield from create_connection(
slave, **self._conn_kwargs)
self._conn = conn
return self._conn
except SlaveNotFoundError:
raise
except RedisError:
pass
addr = yield from service.get_master_address()
try:
conn = yield from create_connection(
addr, **self._conn_kwargs)
self._conn = conn
return self._conn
except RedisError:
raise SlaveNotFoundError
return self._conn
slaves = yield from service.get_slaves()
num = service.num_slaves()
for idx in range(num):
slave = slaves[idx]
try:
conn = yield from create_connection(
slave, **self._conn_kwargs)
self._conn = conn
return self._conn
except SlaveNotFoundError:
raise
except RedisError:
pass
addr = yield from service.get_master_address()
try:
conn = yield from create_connection(
addr, **self._conn_kwargs)
self._conn = conn
return self._conn
except RedisError:
raise SlaveNotFoundError
return self._conn