Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _gather(self, futures, errors='raise', direct=None, local_worker=None):
futures2, keys = unpack_remotedata(futures, byte_keys=True)
keys = [tokey(key) for key in keys]
bad_data = dict()
if direct is None:
try:
w = get_worker()
except Exception:
direct = False
else:
if w.scheduler.address == self.scheduler.address:
direct = True
@gen.coroutine
def wait(k):
""" Want to stop the All(...) early if we find an error """
st = self.futures[k]
yield st.wait()
def _replicate(self, futures, n=None, workers=None, branching_factor=2):
futures = self.futures_of(futures)
yield _wait(futures)
keys = {tokey(f.key) for f in futures}
yield self.scheduler.replicate(keys=list(keys), n=n, workers=workers,
branching_factor=branching_factor)
def _rebalance(self, futures=None, workers=None):
yield _wait(futures)
keys = list({tokey(f.key) for f in self.futures_of(futures)})
result = yield self.scheduler.rebalance(keys=keys, workers=workers)
assert result['status'] == 'OK'
if values:
dsk = dask.optimize.inline(dsk, keys=values)
d = {k: unpack_remotedata(v) for k, v in dsk.items()}
extra_keys = set.union(*[v[1] for v in d.values()]) if d else set()
dsk2 = str_graph({k: v[0] for k, v in d.items()}, extra_keys)
dsk3 = {k: v for k, v in dsk2.items() if k is not v}
if restrictions:
restrictions = keymap(tokey, restrictions)
restrictions = valmap(list, restrictions)
if loose_restrictions is not None:
loose_restrictions = list(map(tokey, loose_restrictions))
dependencies = {tokey(k): set(map(tokey, v[1])) for k, v in d.items()}
for s in dependencies.values():
for v in s:
if v not in self.futures:
raise CancelledError(v)
for k, v in dsk3.items():
dependencies[k] |= set(_deps(dsk3, v))
self._send_to_scheduler({'op': 'update-graph',
'tasks': valmap(dumps_task, dsk3),
'dependencies': valmap(list, dependencies),
'keys': list(flatkeys),
'restrictions': restrictions or {},
'loose_restrictions': loose_restrictions,
'client': self.id,
def release(self, _in_destructor=False):
# NOTE: this method can be called from different threads
# (see e.g. Client.get() or Future.__del__())
if not self._cleared and self.client.generation == self._generation:
self._cleared = True
self.client.loop.add_callback(self.client._dec_ref, tokey(self.key))
def type(self):
try:
return self.executor.futures[tokey(self.key)]['type']
except KeyError:
return None
def release(self):
if not self._cleared and self.executor.generation == self._generation:
self._cleared = True
self.executor._dec_ref(tokey(self.key))
def _scatter(self, data, workers=None, broadcast=False, direct=None,
local_worker=None, timeout=no_default, hash=True):
if timeout == no_default:
timeout = self._timeout
if isinstance(workers, six.string_types + (Number,)):
workers = [workers]
if isinstance(data, dict) and not all(isinstance(k, (bytes, unicode))
for k in data):
d = yield self._scatter(keymap(tokey, data), workers, broadcast)
raise gen.Return({k: d[tokey(k)] for k in data})
if isinstance(data, type(range(0))):
data = list(data)
input_type = type(data)
names = False
unpack = False
if isinstance(data, Iterator):
data = list(data)
if isinstance(data, (set, frozenset)):
data = list(data)
if not isinstance(data, (dict, list, tuple, set, frozenset)):
unpack = True
data = [data]
if isinstance(data, (list, tuple)):
if hash:
names = [type(x).__name__ + '-' + tokenize(x) for x in data]
def __init__(self, key, client=None, inform=True, state=None):
self.key = key
self._cleared = False
tkey = tokey(key)
self.client = client or _get_global_client()
self.client._inc_ref(tkey)
self._generation = self.client.generation
if tkey in self.client.futures:
self._state = self.client.futures[tkey]
else:
self._state = self.client.futures[tkey] = FutureState()
if inform:
self.client._send_to_scheduler({'op': 'client-desires-keys',
'keys': [tokey(key)],
'client': self.client.id})
if state is not None:
try:
handler = self.client._state_handlers[state]
except KeyError:
pass
else:
handler(key=key)