Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
do_rename = delayed(rename)
def fix_name(df):
# Undo rename(df) and apply proper prefix base on column name
newdf = gd.DataFrame()
for k in df.columns:
if magic_token in k:
_, name = k.split(magic_token, 1)
newk = "_".join([prefix[name], name])
else:
newk = k
newdf[newk] = df[k]
return newdf
do_fix_name = delayed(fix_name)
def drop_prefix(df):
newdf = gd.DataFrame()
for k in df.columns:
if magic_token in k:
_, name = k.split(magic_token, 1)
newk = magic_token + name
else:
newk = k
newdf[newk] = df[k]
return newdf
@delayed
def do_local_groupby(df, method):
return drop_prefix(chunk(df.groupby(by=by, method=method)))
def split(cv, X, y=None):
# Avoid repeated hashing by preconverting to `Delayed` objects
dX = delayed(X, pure=True)
dy = delayed(y, pure=True)
for train, test in cv.split(X, y):
X_train = _safe_indexing(dX, train)
X_test = _safe_indexing(dX, test)
if y is not None:
y_train = _safe_indexing(dy, train)
y_test = _safe_indexing(dy, test)
else:
y_train = y_test = None
yield X_train, y_train, X_test, y_test
def check_cv(cv=3, y=None, classifier=False):
"""Dask aware version of ``sklearn.model_selection.check_cv``
Same as the scikit-learn version, but works if ``y`` is a dask object.
"""
if cv is None:
cv = 3
# If ``cv`` is not an integer, the scikit-learn implementation doesn't
# touch the ``y`` object, so passing on a dask object is fine
if not is_dask_collection(y) or not isinstance(cv, numbers.Integral):
return model_selection.check_cv(cv, y, classifier)
if classifier:
# ``y`` is a dask object. We need to compute the target type
target_type = delayed(type_of_target, pure=True)(y).compute()
if target_type in ("binary", "multiclass"):
return StratifiedKFold(cv)
return KFold(cv)
dict
The generated data summary.
"""
# Create a series for each column in the DataFrame.
columns = df.columns
df = delayed(df)
cols = {k: delayed(df.get)(k) for k in columns}
# Create the delayed reports using Dask.
row_c = delayed(metrics.row_count)(df)
cprops = {k: delayed(metrics.column_properties)(cols[k]) for k in columns}
joined_cprops = _join_dask_results(list(cprops.values()))
freqs = {
k: delayed(metrics.frequencies)(cols[k], cprops[k]) for k in columns
}
joined_freqs = _join_dask_results(list(freqs.values()))
csumms = {
k: delayed(metrics.column_summary)(cols[k], cprops[k]) for k in columns
}
joined_csumms = _join_dask_results(list(csumms.values()))
out = {k: delayed(metrics.outliers)(cols[k], csumms[k]) for k in columns}
joined_outliers = _join_dask_results(list(out.values()))
corr = delayed(metrics.correlation)(df, joined_cprops)
pdens_results = []
if pairdensities:
for col1, col2 in itertools.combinations(columns, 2):
Parameters
----------
output : delayed node output
item : str
item to take from the output
name : str
delayed key name (default: item)
Returns
-------
delayed object yielding the item
"""
name = name or item
new_key_name = get_key_id(output.key) + '/' + str(name)
new_key = reset_key_id(output.key, new_key_name)
return delayed(operator.getitem)(output, item, dask_key_name=new_key)
df : pd.DataFrame
The DataFrame for which to generate the summary.
pairdensities : bool, optional
Whether to compute the pairdensity estimation between all pairs of
numerical columns. For most datasets, this is the most expensive
computation. Default is True.
Returns
-------
dict
The generated data summary.
"""
# Create a series for each column in the DataFrame.
columns = df.columns
df = delayed(df)
cols = {k: delayed(df.get)(k) for k in columns}
# Create the delayed reports using Dask.
row_c = delayed(metrics.row_count)(df)
cprops = {k: delayed(metrics.column_properties)(cols[k]) for k in columns}
joined_cprops = _join_dask_results(list(cprops.values()))
freqs = {
k: delayed(metrics.frequencies)(cols[k], cprops[k]) for k in columns
}
joined_freqs = _join_dask_results(list(freqs.values()))
csumms = {
k: delayed(metrics.column_summary)(cols[k], cprops[k]) for k in columns
}
sl : slice
input_dict : dict
with_values : dict {'node_name': np.array}
Returns
-------
out : dask.delayed object
object.key is (self.name, sl.start, n)
"""
with_values = with_values or {}
dask_key_name = make_key(self.name, sl)
if self.name in with_values:
# Set the data to with_values
output = to_output(input_dict, data=with_values[self.name])
return delayed(output, name=dask_key_name)
else:
dinput = delayed(input_dict, pure=True)
return delayed(self.operation)(dinput,
dask_key_name=dask_key_name)
def process_quarter_gpu(client, col_names_path, acq_data_path, year=2000, quarter=1, perf_file=""):
dask_client = client
ml_arrays = run_dask_task(delayed(run_gpu_workflow),
col_path=col_names_path,
acq_path=acq_data_path,
quarter=quarter,
year=year,
perf_file=perf_file)
return dask_client.compute(ml_arrays,
optimize_graph=False,
fifo_timeout="0ms")
npartitions = max(100, df.npartitions)
else:
if npartitions is None:
npartitions = df.npartitions
repartition = False
if not isinstance(index, Series):
index2 = df[index]
else:
index2 = index
if divisions is None:
if repartition:
index2, df = base.optimize(index2, df)
parts = df.to_delayed(optimize_graph=False)
sizes = [delayed(sizeof)(part) for part in parts]
else:
(index2,) = base.optimize(index2)
sizes = []
divisions = index2._repartition_quantiles(npartitions, upsample=upsample)
iparts = index2.to_delayed(optimize_graph=False)
mins = [ipart.min() for ipart in iparts]
maxes = [ipart.max() for ipart in iparts]
sizes, mins, maxes = base.optimize(sizes, mins, maxes)
divisions, sizes, mins, maxes = base.compute(
divisions, sizes, mins, maxes, optimize_graph=False
)
divisions = divisions.tolist()
empty_dataframe_detected = pd.isnull(divisions).all()
if repartition or empty_dataframe_detected:
def to_keys(dsk, *args):
for x in args:
if x is None:
yield None
elif isinstance(x, da.Array):
x = delayed(x)
dsk.update(x.dask)
yield x.key
elif isinstance(x, Delayed):
dsk.update(x.dask)
yield x.key
else:
assert not is_dask_collection(x)
key = "array-" + tokenize(x)
dsk[key] = x
yield key