Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self._writer = None
self._reader = None
while self._waiters:
waiter, *spam = self._waiters.popleft()
logger.debug("Cancelling waiter %r", (waiter, spam))
if exc is None:
waiter.cancel()
else:
waiter.set_exception(exc)
while self._pubsub_channels:
_, ch = self._pubsub_channels.popitem()
logger.debug("Closing pubsub channel %r", ch)
ch.close()
while self._pubsub_patterns:
_, ch = self._pubsub_patterns.popitem()
logger.debug("Closing pubsub pattern %r", ch)
ch.close()
def _do_close(self, exc):
if self._closed:
return
self._closed = True
self._closing = False
self._writer.transport.close()
self._reader_task.cancel()
self._reader_task = None
self._writer = None
self._reader = None
while self._waiters:
waiter, *spam = self._waiters.popleft()
logger.debug("Cancelling waiter %r", (waiter, spam))
if exc is None:
waiter.cancel()
else:
waiter.set_exception(exc)
while self._pubsub_channels:
_, ch = self._pubsub_channels.popitem()
logger.debug("Closing pubsub channel %r", ch)
ch.close()
while self._pubsub_patterns:
_, ch = self._pubsub_patterns.popitem()
logger.debug("Closing pubsub pattern %r", ch)
ch.close()
SSL argument is passed through to asyncio.create_connection.
By default SSL/TLS is not used.
Encoding argument can be used to decode byte-replies to strings.
By default no decoding is done.
Return value is RedisConnection instance.
This function is a coroutine.
"""
assert isinstance(address, (tuple, list, str)), "tuple or str expected"
if isinstance(address, (list, tuple)):
host, port = address
logger.debug("Creating tcp connection to %r", address)
reader, writer = yield from asyncio.open_connection(
host, port, ssl=ssl, loop=loop)
sock = writer.transport.get_extra_info('socket')
if sock is not None:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
address = sock.getpeername()
address = tuple(address[:2])
else:
logger.debug("Creating unix connection to %r", address)
reader, writer = yield from asyncio.open_unix_connection(
address, ssl=ssl, loop=loop)
sock = writer.transport.get_extra_info('socket')
if sock is not None:
address = sock.getpeername()
conn = RedisConnection(reader, writer, encoding=encoding,
address=address, loop=loop)