Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
adjust_chunks = kwargs.pop('adjust_chunks', None)
new_axes = kwargs.get('new_axes', {})
if dtype is None:
raise ValueError("Must specify dtype of output array")
chunkss, arrays = unify_chunks(*args)
for k, v in new_axes.items():
chunkss[k] = (v,)
arginds = list(zip(arrays, args[1::2]))
numblocks = dict([(a.name, a.numblocks) for a, _ in arginds])
argindsstr = list(concat([(a.name, ind) for a, ind in arginds]))
# Finish up the name
if not out:
out = '%s-%s' % (token or funcname(func).strip('_'),
tokenize(func, out_ind, argindsstr, dtype, **kwargs))
dsk = top(func, out, out_ind, *argindsstr, numblocks=numblocks, **kwargs)
dsks = [a.dask for a, _ in arginds]
chunks = [chunkss[i] for i in out_ind]
if adjust_chunks:
for i, ind in enumerate(out_ind):
if ind in adjust_chunks:
if callable(adjust_chunks[ind]):
chunks[i] = tuple(map(adjust_chunks[ind], chunks[i]))
elif isinstance(adjust_chunks[ind], int):
chunks[i] = tuple(adjust_chunks[ind] for _ in chunks[i])
elif isinstance(adjust_chunks[ind], (tuple, list)):
chunks[i] = tuple(adjust_chunks[ind])
else:
split_every = npartitions
elif split_every < 2 or not isinstance(split_every, int):
raise ValueError("split_every must be an integer >= 2")
token_key = tokenize(
token or (chunk, aggregate),
meta,
args,
chunk_kwargs,
aggregate_kwargs,
combine_kwargs,
split_every,
)
# Chunk
a = "{0}-chunk-{1}".format(token or funcname(chunk), token_key)
if len(args) == 1 and isinstance(args[0], _Frame) and not chunk_kwargs:
dsk = {(a, 0, i): (chunk, key) for i, key in enumerate(args[0].__dask_keys__())}
else:
dsk = {
(a, 0, i): (
apply,
chunk,
[(x._name, i) if isinstance(x, _Frame) else x for x in args],
chunk_kwargs,
)
for i in range(args[0].npartitions)
}
# Combine
b = "{0}-combine-{1}".format(token or funcname(combine), token_key)
k = npartitions
b = a
depth = 0
while k > split_every:
b = prefix + str(depth)
for part_i, inds in enumerate(partition_all(split_every, range(k))):
conc = (_concat, [(a, i) for i in inds])
if combine_kwargs:
dsk[(b, part_i)] = (apply, combine, [conc], combine_kwargs)
else:
dsk[(b, part_i)] = (combine, conc)
k = part_i + 1
a = b
depth += 1
# Aggregate
b = '{0}-agg-{1}'.format(token or funcname(aggregate), token_key)
conc = (_concat, [(a, i) for i in range(k)])
if aggregate_kwargs:
dsk[(b, 0)] = (apply, aggregate, [conc], aggregate_kwargs)
else:
dsk[(b, 0)] = (aggregate, conc)
if meta is no_default:
meta_chunk = _emulate(apply, chunk, args, chunk_kwargs)
meta = _emulate(apply, aggregate, [_concat([meta_chunk])],
aggregate_kwargs)
meta = make_meta(meta)
for arg in args:
if isinstance(arg, _Frame):
dsk.update(arg.dask)
return new_dd_object(dsk, b, meta, [None, None])
res = []
if isinstance(spec, dict):
for input_column, subspec in spec.items():
if isinstance(subspec, dict):
res.extend(
((input_column, result_column), func, input_column)
for result_column, func in subspec.items()
)
else:
if not isinstance(subspec, list):
subspec = [subspec]
res.extend(
((input_column, funcname(func)), func, input_column)
for func in subspec
)
else:
raise ValueError("unsupported agg spec of type {}".format(type(spec)))
compounds = (list, tuple, dict)
use_flat_columns = not any(
isinstance(subspec, compounds) for subspec in spec.values()
)
if use_flat_columns:
res = [(input_col, func, input_col) for (_, func, input_col) in res]
return res
>>> from operator import add
>>> task_label((add, 1, 2))
'add'
>>> task_label((add, (add, 1, 2), 3))
'add(...)'
"""
func = task[0]
if func is apply:
func = task[1]
if hasattr(func, "funcs"):
if len(func.funcs) > 1:
return "{0}(...)".format(funcname(func.funcs[0]))
else:
head = funcname(func.funcs[0])
else:
head = funcname(func)
if any(has_sub_tasks(i) for i in task[1:]):
return "{0}(...)".format(head)
else:
return head
import partd
p = ("partd-" + token,)
dirname = config.get("temporary_directory", None)
if dirname:
file = (apply, partd.File, (), {"dir": dirname})
else:
file = (partd.File,)
try:
dsk1 = {p: (partd.Python, (partd.Snappy, file))}
except AttributeError:
dsk1 = {p: (partd.Python, file)}
# Partition data on disk
name = "groupby-part-{0}-{1}".format(funcname(grouper), token)
dsk2 = dict(
((name, i), (partition, grouper, (b.name, i), npartitions, p, blocksize))
for i in range(b.npartitions)
)
# Barrier
barrier_token = "groupby-barrier-" + token
dsk3 = {barrier_token: (chunk.barrier,) + tuple(dsk2)}
# Collect groups
name = "groupby-collect-" + token
dsk4 = dict(
((name, i), (collect, grouper, i, p, barrier_token)) for i in range(npartitions)
)