Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
s = self._sources.popleft()
cb = self._source2cb.pop(s, None)
if cb:
s.disconnect(cb, done=self.on_source_done)
self.emit(*args)
def on_source_done(self, source):
cb = self._source2cb.pop(source)
source.disconnect(cb, done=self.on_source_done)
while source in self._sources:
self._sources.remove(source)
if not self._sources and self._parent is None:
self.set_done()
class Chain(AddableJoinOp):
__slots__ = ('_qq', '_source2cbs')
def __init__(self, *sources):
AddableJoinOp.__init__(self)
self._qq = deque()
self._source2cbs = defaultdict(list) # map from source to callbacks
self._set_sources(*sources)
def add_source(self, source):
if not self._sources:
self._connect_from(source)
else:
def cb(*args):
q.append(args)
q = deque()
self._qq.append(q)
source -= cb
def on_source_s(self, source, *args):
if source is not self._active_source:
self._remove_source(self._active_source)
self._active_source = source
self.emit(*args)
def on_source_done(self, source):
self._remove_source(source)
if not self._sources and self._parent is None:
self._active_source = None
self.set_done()
class Concat(AddableJoinOp):
__slots__ = ('_source2cb',)
def __init__(self, *sources):
AddableJoinOp.__init__(self)
self._source2cb = {} # map from source to callback
self._set_sources(*sources)
def add_source(self, source):
if source in self._sources:
return
self._sources.append(source)
cb = self._source2cb.get(source)
if not cb:
cb = functools.partial(self._on_source_s, source)
self._source2cb[source] = cb
source.connect(cb, done=self.on_source_done)
parent.done_event += self._on_parent_done
def on_source_done(self, source):
self._disconnect_from(source)
self._sources.remove(source)
if not self._sources and self._parent is None:
self.set_done()
def _on_parent_done(self, parent):
parent -= self._on_parent_done
self._parent = None
if not self._sources:
self.set_done()
class Merge(AddableJoinOp):
__slots__ = ()
def add_source(self, source):
self._sources.append(source)
self._connect_from(source)
class Switch(AddableJoinOp):
__slots__ = ('_source2cb', '_active_source')
def __init__(self, *sources):
AddableJoinOp.__init__(self)
self._source2cb = {} # map from source to callback
self._active_source = None
self._set_sources(*sources)
def _on_parent_done(self, parent):
parent -= self._on_parent_done
self._parent = None
if not self._sources:
self.set_done()
class Merge(AddableJoinOp):
__slots__ = ()
def add_source(self, source):
self._sources.append(source)
self._connect_from(source)
class Switch(AddableJoinOp):
__slots__ = ('_source2cb', '_active_source')
def __init__(self, *sources):
AddableJoinOp.__init__(self)
self._source2cb = {} # map from source to callback
self._active_source = None
self._set_sources(*sources)
def add_source(self, source):
self._sources.append(source)
cb = self._source2cb.get(source)
if not cb:
cb = functools.partial(self.on_source_s, source)
self._source2cb[source] = cb
source.connect(cb, done=self.on_source_done)