Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_multifile_serialization(preset_gcs_builder, make_counter):
call_counter = make_counter()
builder = preset_gcs_builder
dask_df = dd.from_pandas(
df_from_csv_str(
"""
color,number
red,1
blue,2
green,3
"""
),
npartitions=1,
)
@builder
@bn.protocol.dask
@call_counter
def df():
return dask_df
[None, datetime.timedelta(milliseconds=8)], type=pa.duration("ms")
)
),
"duration[us]": fletcher_array(
pa.array(
[None, datetime.timedelta(microseconds=7)], type=pa.duration("us")
)
),
# FIXME: assert_extension_array_equal casts to numpy object thus cannot handle nanoseconds
# 'duration[ns]': fletcher_array(pa.array([None, datetime.timedelta(microseconds=7)], type=pa.duration("ns"))),
"list[string]": fletcher_array(
pa.array([None, [None, "🤔"]], type=pa.list_(pa.string()))
),
}
)
ddf = dd.from_pandas(df, npartitions=2)
meta_nonempty = ddf._meta_nonempty
pdt.assert_frame_equal(meta_nonempty, df)
result = ddf.compute()
pdt.assert_frame_equal(result, df)
dd.from_pandas(pd.DataFrame({"A": range(100)}), 25),
],
],
)
def test_matching_blocks_raises(arrays):
with pytest.raises(ValueError):
check_matching_blocks(*arrays)
def test_impute_most_frequent():
# https://github.com/dask/dask-ml/issues/385
data = dd.from_pandas(pd.DataFrame([1, 1, 1, 1, np.nan, np.nan]), 2)
model = dask_ml.impute.SimpleImputer(strategy="most_frequent")
result = model.fit_transform(data)
expected = dd.from_pandas(pd.DataFrame({0: [1.0] * 6}), 2)
dd.utils.assert_eq(result, expected)
assert model.statistics_[0] == 1.0
def to_dask(self, nworkers):
X_train_dask = ddf.from_pandas(self.X_train, npartitions=nworkers)
X_test_dask = ddf.from_pandas(self.X_test, npartitions=nworkers)
y_train_dask = ddf.from_pandas(self.y_train.to_frame(), npartitions=nworkers)
y_test_dask = ddf.from_pandas(self.y_test.to_frame(), npartitions=nworkers)
X_train_dask, X_test_dask, y_train_dask, y_test_dask = dask.persist(
X_train_dask, X_test_dask, y_train_dask, y_test_dask)
return Data(X_train_dask, X_test_dask, y_train_dask, y_test_dask)
def _dask_apply(self, func, axis=0, raw=None, result_type=None, *args, **kwds):
sample = self._obj.iloc[: self._npartitions * 2, :]
with suppress_stdout_stderr():
meta = sample.apply(func, axis=axis, raw=raw, result_type=result_type, args=args, **kwds)
try:
with suppress_stdout_stderr():
# check that the dask apply matches the pandas apply
tmp_df = (
dd.from_pandas(sample, npartitions=self._npartitions)
.apply(func, *args, axis=axis, raw=raw, result_type=result_type, meta=meta, **kwds)
.compute(scheduler=self._scheduler)
)
self._validate_apply(
tmp_df.equals(meta), error_message="Dask apply sample does not match pandas apply sample."
)
if self._progress_bar:
with TQDMDaskProgressBar(desc=self._progress_bar_desc or "Dask Apply"):
return (
dd.from_pandas(self._obj, npartitions=self._npartitions)
.apply(func, *args, axis=axis, raw=raw, result_type=result_type, meta=meta, **kwds)
.compute(scheduler=self._scheduler)
)
else:
return (
dd.from_pandas(self._obj, npartitions=self._npartitions)
def _data_to_source(df, path, **kwargs):
import dask.dataframe as dd
if not is_dataframe_like(df):
raise NotImplementedError
try:
from intake_parquet import ParquetSource
except ImportError:
raise ImportError("Please install intake-parquet to use persistence"
" on dataframe container sources.")
if not hasattr(df, 'npartitions'):
df = dd.from_pandas(df, npartitions=1)
df.to_parquet(path, **kwargs)
source = ParquetSource(path, meta={})
return source
def dask_apply(df, npartitions, myfunc, *args, **kwargs):
if type(df) == pd.DataFrame:
kwargs.pop('meta')
tmp = df.iloc[:1,:].apply(myfunc, args=args, **kwargs)
meta = {c: tmp[c].dtype for c in tmp.columns}
return dd.from_pandas(df, npartitions=npartitions).apply(myfunc, *args, axis=1, **kwargs, meta=meta).compute(get=get)
else:
meta = kwargs.pop('meta')
return dd.from_pandas(df, npartitions=npartitions).map_partitions(myfunc, *args, **kwargs, meta=meta).compute(get=get)
sdoms_file = copy_pdb_h5(job, pdbFileStoreID)
sdoms = pd.read_hdf(unicode(sdoms_file), "merged") #, where="sfam_id == {}".format(sfam_id))
# skip_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "keep.csv")
# if os.path.isfile(skip_file):
# skip = pd.read_csv(skip_file)
# sdoms = sdoms[sdoms["sdi"].isin(skip["sdi"])]
sdoms = sdoms[sdoms["sfam_id"]==float(sfam_id)]["sdi"].drop_duplicates().dropna()
#sdoms = sdoms[:1]
if cores > 2:
#Only makes sense for slurm or other bare-matal clsuters
setup_dask(cores)
d_sdoms = dd.from_pandas(sdoms, npartitions=cores)
RealtimeLogger.info("Running sfam dask {}".format(sdoms))
processed_domains = d_sdoms.apply(lambda row: process_domain(job, row.sdi,
sdoms_file), axis=1).compute()
else:
processed_domains = job.addChildJobFn(map_job_rv, process_domain, sdoms,
pdbFileStoreID, preemptable=True).rv()
return processed_domains
def dask_apply(df, npartitions, myfunc, *args, **kwargs):
if type(df) == pd.DataFrame:
kwargs.pop('meta')
tmp = df.iloc[:1,:].apply(myfunc, args=args, **kwargs)
meta = {c: tmp[c].dtype for c in tmp.columns}
return dd.from_pandas(df, npartitions=npartitions).apply(myfunc, *args, axis=1, **kwargs, meta=meta).compute(get=get)
else:
meta = kwargs.pop('meta')
return dd.from_pandas(df, npartitions=npartitions).map_partitions(myfunc, *args, **kwargs, meta=meta).compute(get=get)