Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _threaded_map(self, q_out, func, qs_in, **kwargs):
""" Internal function for mapping Queue """
if isqueue(qs_in[0]):
get = pyQueue.get
elif isinstance(qs_in[0], Iterator):
get = next
else:
raise NotImplementedError()
while True:
try:
args = [get(q) for q in qs_in]
except StopIteration as e:
q_out.put(e)
break
f = self.submit(func, *args, **kwargs)
q_out.put(f)
>>> e = Executor('127.0.0.1:8787') # doctest: +SKIP
>>> x = e.submit(add, 1, 2) # doctest: +SKIP
>>> e.gather(x) # doctest: +SKIP
3
>>> e.gather([x, [x], x]) # support lists and dicts # doctest: +SKIP
[3, [3], 3]
>>> seq = e.gather(iter([x, x])) # support iterators # doctest: +SKIP
>>> next(seq) # doctest: +SKIP
3
See Also
--------
Executor.scatter: Send data out to cluster
"""
if isqueue(futures):
qout = pyQueue(maxsize=maxsize)
t = Thread(target=self._threaded_gather, args=(futures, qout),
kwargs={'errors': errors})
t.daemon = True
t.start()
return qout
elif isinstance(futures, Iterator):
return (self.gather(f, errors=errors) for f in futures)
else:
return sync(self.loop, self._gather, futures, errors=errors)
>>> c = Client('127.0.0.1:8787') # doctest: +SKIP
>>> x = c.submit(add, 1, 2) # doctest: +SKIP
>>> c.gather(x) # doctest: +SKIP
3
>>> c.gather([x, [x], x]) # support lists and dicts # doctest: +SKIP
[3, [3], 3]
>>> seq = c.gather(iter([x, x])) # support iterators # doctest: +SKIP
>>> next(seq) # doctest: +SKIP
3
See Also
--------
Client.scatter: Send data out to cluster
"""
if isqueue(futures):
qout = pyQueue(maxsize=maxsize)
t = threading.Thread(target=self._threaded_gather,
name="Threaded gather()",
args=(futures, qout),
kwargs={'errors': errors, 'direct': direct})
t.daemon = True
t.start()
return qout
elif isinstance(futures, Iterator):
return (self.gather(f, errors=errors, direct=direct)
for f in futures)
else:
if hasattr(thread_state, 'execution_state'): # within worker task
local_worker = thread_state.execution_state['worker']
else:
local_worker = None
"""
if timeout == no_default:
timeout = self._timeout
if isqueue(data) or isinstance(data, Iterator):
logger.debug("Starting thread for streaming data")
qout = pyQueue(maxsize=maxsize)
t = threading.Thread(target=self._threaded_scatter,
name="Threaded scatter()",
args=(data, qout),
kwargs={'workers': workers,
'broadcast': broadcast})
t.daemon = True
t.start()
if isqueue(data):
return qout
else:
return queue_to_iterator(qout)
else:
if hasattr(thread_state, 'execution_state'): # within worker task
local_worker = thread_state.execution_state['worker']
else:
local_worker = None
return self.sync(self._scatter, data, workers=workers,
broadcast=broadcast, direct=direct,
local_worker=local_worker, timeout=timeout,
asynchronous=asynchronous, hash=hash)
See Also
--------
Executor.gather: Gather data back to local process
"""
if isqueue(data) or isinstance(data, Iterator):
logger.debug("Starting thread for streaming data")
qout = pyQueue(maxsize=maxsize)
t = Thread(target=self._threaded_scatter,
args=(data, qout),
kwargs={'workers': workers, 'broadcast': broadcast})
t.daemon = True
t.start()
if isqueue(data):
return qout
else:
return queue_to_iterator(qout)
else:
return sync(self.loop, self._scatter, data, workers=workers,
broadcast=broadcast)
@gen.coroutine
See also
--------
Executor.submit: Submit a single function
"""
if not callable(func):
raise TypeError("First input to map must be a callable function")
if (all(map(isqueue, iterables)) or
all(isinstance(i, Iterator) for i in iterables)):
maxsize = kwargs.pop('maxsize', 0)
q_out = pyQueue(maxsize=maxsize)
t = Thread(target=self._threaded_map, args=(q_out, func, iterables),
kwargs=kwargs)
t.daemon = True
t.start()
if isqueue(iterables[0]):
return q_out
else:
return queue_to_iterator(q_out)
pure = kwargs.pop('pure', True)
workers = kwargs.pop('workers', None)
allow_other_workers = kwargs.pop('allow_other_workers', False)
if allow_other_workers and workers is None:
raise ValueError("Only use allow_other_workers= if using workers=")
iterables = list(zip(*zip(*iterables)))
if pure:
keys = [funcname(func) + '-' + tokenize(func, kwargs, *args)
for args in zip(*iterables)]
else: