Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
with TQDMDaskProgressBar(desc=self._progress_bar_desc or "Dask Apply"):
return (
dd.from_pandas(self._obj, npartitions=self._npartitions)
.map_partitions(func, *args, meta=meta, **kwds)
.compute(scheduler=self._scheduler)
)
else:
return (
dd.from_pandas(self._obj, npartitions=self._npartitions)
.map_partitions(func, *args, meta=meta, **kwds)
.compute(scheduler=self._scheduler)
)
except ERRORS_TO_HANDLE:
# if map partitions doesn't match pandas apply, we can use dask apply, but it will be a bit slower
if self._progress_bar:
with TQDMDaskProgressBar(desc=self._progress_bar_desc or "Dask Apply"):
return (
dd.from_pandas(self._obj, npartitions=self._npartitions)
.apply(lambda x: func(x, *args, **kwds), convert_dtype=convert_dtype, meta=meta)
.compute(scheduler=self._scheduler)
)
else:
return (
dd.from_pandas(self._obj, npartitions=self._npartitions)
.apply(lambda x: func(x, *args, **kwds), convert_dtype=convert_dtype, meta=meta)
.compute(scheduler=self._scheduler)
)
sample = self._obj.iloc[: self._npartitions * 2, :]
with suppress_stdout_stderr():
meta = sample.applymap(func)
try:
with suppress_stdout_stderr():
# check that the dask apply matches the pandas apply
tmp_df = (
dd.from_pandas(sample, npartitions=self._npartitions)
.applymap(func, meta=meta)
.compute(scheduler=self._scheduler)
)
self._validate_apply(
tmp_df.equals(meta), error_message="Dask applymap sample does not match pandas applymap sample."
)
if self._progress_bar:
with TQDMDaskProgressBar(desc=self._progress_bar_desc or "Dask Applymap"):
return (
dd.from_pandas(self._obj, npartitions=self._npartitions)
.applymap(func, meta=meta)
.compute(scheduler=self._scheduler)
)
else:
return (
dd.from_pandas(self._obj, npartitions=self._npartitions)
.applymap(func, meta=meta)
.compute(scheduler=self._scheduler)
)
except ERRORS_TO_HANDLE:
# if dask apply doesn't match pandas apply, fallback to pandas
if self._progress_bar:
tqdm.pandas(desc=self._progress_bar_desc or "Pandas Apply")
applymap_func = self._obj.progress_applymap
def __init__(self, start=None, start_state=None, pretask=None, posttask=None, finish=None, **kwargs):
super(TQDMDaskProgressBar, self).__init__(
start=start, start_state=start_state, pretask=pretask, posttask=posttask, finish=finish
)
self.tqdm_args = kwargs
self.states = ["ready", "waiting", "running", "finished"]
def _dask_apply(self, func, *args, **kwds):
try:
# check that the dask rolling apply matches the pandas apply
with suppress_stdout_stderr():
tmp_df = (
dd.from_pandas(self._comparison_pd, npartitions=self._npartitions)
.rolling(**{k: v for k, v in self._rolling_kwds.items() if k not in ["on", "closed"]})
.apply(func, *args, **kwds)
.compute(scheduler=self._scheduler)
)
self._validate_apply(
tmp_df.equals(self._comparison_pd.rolling(**self._rolling_kwds).apply(func, *args, **kwds)),
error_message="Dask rolling apply sample does not match pandas rolling apply sample.",
)
if self._progress_bar:
with TQDMDaskProgressBar(desc=self._progress_bar_desc or "Dask Apply"):
return self._obj_dd.apply(func, *args, **kwds).compute(scheduler=self._scheduler)
else:
return self._obj_dd.apply(func, *args, **kwds).compute(scheduler=self._scheduler)
except ERRORS_TO_HANDLE:
if self._progress_bar:
tqdm.pandas(desc=self._progress_bar_desc or "Pandas Apply")
return self._obj_pd.progress_apply(func, *args, **kwds)
else:
return self._obj_pd.apply(func, *args, **kwds)