Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _unsubscribe(self, channels, subscribers, mtype):
if not channels:
channels = []
for (channel, subs) in subscribers.items():
if self in subs:
channels.append(channel)
for channel in channels:
subs = subscribers.get(channel, set())
if self in subs:
subs.remove(self)
if not subs:
del subscribers[channel]
self._pubsub -= 1
msg = [mtype, channel, self._pubsub]
self.responses.put(msg)
return NoResponse()
def _subscribe(self, channels, subscribers, mtype):
for channel in channels:
subs = subscribers[channel]
if self not in subs:
subs.add(self)
self._pubsub += 1
msg = [mtype, channel, self._pubsub]
self.responses.put(msg)
return NoResponse()
def valid_response_type(value, nested=False):
if isinstance(value, NoResponse) and not nested:
return True
if value is not None and not isinstance(value, (bytes, SimpleString, redis.ResponseError,
int, long, list)):
return False
if isinstance(value, list):
if any(not valid_response_type(item, True) for item in value):
return False
return True