Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
>>> x = c.submit(add, 1, 2) # doctest: +SKIP
>>> c.gather(x) # doctest: +SKIP
3
>>> c.gather([x, [x], x]) # support lists and dicts # doctest: +SKIP
[3, [3], 3]
>>> seq = c.gather(iter([x, x])) # support iterators # doctest: +SKIP
>>> next(seq) # doctest: +SKIP
3
See Also
--------
Client.scatter: Send data out to cluster
"""
if isqueue(futures):
qout = pyQueue(maxsize=maxsize)
t = threading.Thread(target=self._threaded_gather,
name="Threaded gather()",
args=(futures, qout),
kwargs={'errors': errors, 'direct': direct})
t.daemon = True
t.start()
return qout
elif isinstance(futures, Iterator):
return (self.gather(f, errors=errors, direct=direct)
for f in futures)
else:
if hasattr(thread_state, 'execution_state'): # within worker task
local_worker = thread_state.execution_state['worker']
else:
local_worker = None
return self.sync(self._gather, futures, errors=errors,
def _threaded_map(self, q_out, func, qs_in, **kwargs):
""" Internal function for mapping Queue """
if isqueue(qs_in[0]):
get = pyQueue.get
elif isinstance(qs_in[0], Iterator):
get = next
else:
raise NotImplementedError()
while True:
try:
args = [get(q) for q in qs_in]
except StopIteration as e:
q_out.put(e)
break
f = self.submit(func, *args, **kwargs)
q_out.put(f)
>>> x = e.submit(add, 1, 2) # doctest: +SKIP
>>> e.gather(x) # doctest: +SKIP
3
>>> e.gather([x, [x], x]) # support lists and dicts # doctest: +SKIP
[3, [3], 3]
>>> seq = e.gather(iter([x, x])) # support iterators # doctest: +SKIP
>>> next(seq) # doctest: +SKIP
3
See Also
--------
Executor.scatter: Send data out to cluster
"""
if isqueue(futures):
qout = pyQueue(maxsize=maxsize)
t = Thread(target=self._threaded_gather, args=(futures, qout),
kwargs={'errors': errors})
t.daemon = True
t.start()
return qout
elif isinstance(futures, Iterator):
return (self.gather(f, errors=errors) for f in futures)
else:
return sync(self.loop, self._gather, futures, errors=errors)
def _threaded_map(self, q_out, func, qs_in, **kwargs):
""" Internal function for mapping Queue """
if isqueue(qs_in[0]):
get = pyQueue.get
elif isinstance(qs_in[0], Iterator):
get = next
else:
raise NotImplementedError()
while True:
try:
args = [get(q) for q in qs_in]
except StopIteration as e:
q_out.put(e)
break
f = self.submit(func, *args, **kwargs)
q_out.put(f)
Returns
-------
List, iterator, or Queue of futures, depending on the type of the
inputs.
See also
--------
Executor.submit: Submit a single function
"""
if not callable(func):
raise TypeError("First input to map must be a callable function")
if (all(map(isqueue, iterables)) or
all(isinstance(i, Iterator) for i in iterables)):
maxsize = kwargs.pop('maxsize', 0)
q_out = pyQueue(maxsize=maxsize)
t = Thread(target=self._threaded_map, args=(q_out, func, iterables),
kwargs=kwargs)
t.daemon = True
t.start()
if isqueue(iterables[0]):
return q_out
else:
return queue_to_iterator(q_out)
pure = kwargs.pop('pure', True)
workers = kwargs.pop('workers', None)
allow_other_workers = kwargs.pop('allow_other_workers', False)
if allow_other_workers and workers is None:
raise ValueError("Only use allow_other_workers= if using workers=")
This returns an iterator that yields the input future objects in the order
in which they complete. Calling ``next`` on the iterator will block until
the next future completes, irrespective of order.
This function does not return futures in the order in which they are input.
"""
fs = list(fs)
if len(set(f.executor for f in fs)) == 1:
loop = first(fs).executor.loop
else:
# TODO: Groupby executor, spawn many _as_completed coroutines
raise NotImplementedError(
"as_completed on many event loops not yet supported")
queue = pyQueue()
coroutine = lambda: _as_completed(fs, queue)
loop.add_callback(coroutine)
for i in range(len(fs)):
yield queue.get()
def __init__(self, futures=None, loop=None, with_results=False):
if futures is None:
futures = []
self.futures = defaultdict(lambda: 0)
self.queue = pyQueue()
self.lock = threading.Lock()
self.loop = loop or default_client().loop
self.condition = Condition()
self.thread_condition = threading.Condition()
self.with_results = with_results
if futures:
self.update(futures)