Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
tup = tuple(q.popleft() for q in self._results)
self._num_ready = sum(bool(q) for q in self._results)
self.emit(*tup)
def on_source_done(self, source):
self._sources.remove(source)
if not self._sources:
for source, cbs in self._source2cbs.items():
for cb in cbs:
source.disconnect(
cb, self.on_source_error, self.on_source_done)
self._source2cbs = None
self.set_done()
class Ziplatest(JoinOp):
__slots__ = ('_values', '_is_primed', '_source2cbs')
def __init__(self, *sources, partial=True):
JoinOp.__init__(self)
self._is_primed = partial
self._source2cbs = defaultdict(list) # map from source to callbacks
if sources:
self._set_sources(*sources)
def _set_sources(self, *sources):
sources = [Event.create(s) for s in sources]
self._sources = deque(s for s in sources if not s.done())
if not self._sources:
self.set_done()
return
self._values = [s.value() for s in sources]
source = self._sources[0]
q = self._qq.popleft()
for args in q:
self.emit(*args)
for cb in self._source2cbs.pop(source, []):
source -= cb
if source.done():
self._sources.popleft()
continue
self._connect_from(source)
return
if not self._sources and self._parent is None:
self.set_done()
class Zip(JoinOp):
__slots__ = ('_results', '_source2cbs', '_num_ready')
def __init__(self, *sources):
JoinOp.__init__(self)
self._num_ready = 0 # number of sources with a pending result
self._source2cbs = defaultdict(list) # map from source to callbacks
if sources:
self._set_sources(*sources)
def _set_sources(self, *sources):
self._sources = deque(Event.create(s) for s in sources)
if any(s.done() for s in self._sources):
self.set_done()
return
self._results = [deque() for _ in self._sources]
for i, source in enumerate(self._sources):
def chain(self) -> "Chain":
return self.join(Chain())
class JoinOp(Op):
"""
Base class for join operators that combine the emits
from multiple source events.
"""
__slots__ = ('_sources',)
def _set_sources(self, sources):
raise NotImplementedError
class AddableJoinOp(JoinOp):
"""
Base class for join operators where new sources, produced by a
parent higher-order event, can be added dynamically.
"""
__slots__ = ('_parent',)
def __init__(self, *sources: Event):
JoinOp.__init__(self)
self._sources = deque()
self._parent = None
self._set_sources(*sources)
def _set_sources(self, *sources):
for source in sources:
source = Event.create(source)
self.add_source(source)