Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def dummy_ping(*args, **kwargs):
dummy_ping.n_pings += 1
if dummy_ping.n_pings < 2:
return original_ping(*args, **kwargs)
else:
loop = asyncio.get_event_loop()
return create_future(loop)
dummy_ping.n_pings = 0
raise ConnectionClosedError("Connection closed or corrupted")
if None in set(channels):
raise TypeError("args must not contain None")
if not len(channels):
raise TypeError("No channels/patterns supplied")
is_pattern = len(command) in (10, 12)
mkchannel = partial(Channel, is_pattern=is_pattern, loop=self._loop)
channels = [ch if isinstance(ch, Channel) else mkchannel(ch)
for ch in channels]
if not all(ch.is_pattern == is_pattern for ch in channels):
raise ValueError("Not all channels {} match command {}"
.format(channels, command))
cmd = encode_command(command, *(ch.name for ch in channels))
res = []
for ch in channels:
fut = create_future(loop=self._loop)
res.append(fut)
cb = partial(self._update_pubsub, ch=ch)
self._waiters.append((fut, None, cb))
self._writer.write(cmd)
return asyncio.gather(*res, loop=self._loop)
logger.warning("Deprecated. Use `execute_pubsub` method directly")
return self.execute_pubsub(command, *args)
if command in ('SELECT', b'SELECT'):
cb = partial(self._set_db, args=args)
elif command in ('MULTI', b'MULTI'):
cb = self._start_transaction
elif command in ('EXEC', b'EXEC'):
cb = partial(self._end_transaction, discard=False)
elif command in ('DISCARD', b'DISCARD'):
cb = partial(self._end_transaction, discard=True)
else:
cb = None
if encoding is _NOTSET:
encoding = self._encoding
fut = create_future(loop=self._loop)
self._writer.write(encode_command(command, *args))
self._waiters.append((fut, encoding, cb))
return fut
def __init__(self, reader, writer, *, address, encoding=None, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self._reader = reader
self._writer = writer
self._address = address
self._loop = loop
self._waiters = deque()
self._parser = hiredis.Reader(protocolError=ProtocolError,
replyError=ReplyError)
self._reader_task = async_task(self._read_data(), loop=self._loop)
self._db = 0
self._closing = False
self._closed = False
self._close_waiter = create_future(loop=self._loop)
self._reader_task.add_done_callback(self._close_waiter.set_result)
self._in_transaction = None
self._transaction_error = None # XXX: never used?
self._in_pubsub = 0
self._pubsub_channels = coerced_keys_dict()
self._pubsub_patterns = coerced_keys_dict()
self._encoding = encoding