Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import eventkit as ev
ev.Range(2, 5)[ev.Min, ev.Max, ev.Sum].zip()
->
[(2, 2, 2), (2, 3, 5), (2, 4, 9)]
The events in the fork can be combined by one of the join
methods of ``Fork``.
Args:
targets: One or more events that have no source yet,
or ``Event`` constructors that need no arguments.
"""
fork = Fork()
for t in targets:
t = Event.create(t)
t.set_source(self)
fork.append(t)
return fork
def _set_sources(self, *sources):
sources = [Event.create(s) for s in sources]
self._sources = deque(s for s in sources if not s.done())
if not self._sources:
self.set_done()
return
self._values = [s.value() for s in sources]
for i, source in enumerate(self._sources):
cb = functools.partial(self._on_source_i, i)
source.connect(cb, self.on_source_error, self.on_source_done)
self._source2cbs[source].append(cb)
def _set_sources(self, *sources):
for source in sources:
source = Event.create(source)
self.add_source(source)
def _set_sources(self, *sources):
self._sources = deque(Event.create(s) for s in sources)
if any(s.done() for s in self._sources):
self.set_done()
return
self._results = [deque() for _ in self._sources]
for i, source in enumerate(self._sources):
cb = functools.partial(self._on_source_i, i)
source.connect(cb, self.on_source_error, self.on_source_done)
self._source2cbs[source].append(cb)