Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
timed = timeit.timeit(wrapped, number=N_REPEATS)
sample_proc_est = timed / N_REPEATS
est_apply_duration = sample_proc_est / self._SAMPLE_SIZE * self._nrows
# No `allow_dask_processing` variable here, because we don't know the dtypes of the transformation
if est_apply_duration > self._dask_threshold:
return self._dask_apply(func, *args, **kwds)
else: # use pandas
if self._progress_bar and hasattr(self._obj_pd, "progress_apply"):
tqdm.pandas(desc=self._progress_bar_desc or "Pandas Apply")
return self._obj_pd.progress_apply(func, *args, **kwds)
else:
return self._obj_pd.apply(func, *args, **kwds)
class Rolling(Transformation):
def __init__(
self,
pandas_obj,
npartitions=None,
dask_threshold=1,
scheduler="processes",
progress_bar=True,
progress_bar_desc=None,
allow_dask_on_strings=False,
**kwds
):
super(Rolling, self).__init__(
pandas_obj, npartitions, dask_threshold, scheduler, progress_bar, progress_bar_desc, allow_dask_on_strings
)
self._rolling_kwds = kwds.copy()
self._comparison_pd = self._obj_pd.iloc[: self._npartitions * 2]
def __init__(
self,
pandas_obj,
npartitions=None,
dask_threshold=1,
scheduler="processes",
progress_bar=True,
progress_bar_desc=None,
allow_dask_on_strings=False,
):
super(Transformation, self).__init__(
pandas_obj, npartitions, dask_threshold, scheduler, progress_bar, progress_bar_desc, allow_dask_on_strings
)
self._sample_pd = pandas_obj.iloc[: self._SAMPLE_SIZE]
self._obj_pd = pandas_obj
self._obj_dd = dd.from_pandas(pandas_obj, npartitions=npartitions)
self._nrows = pandas_obj.shape[0]
error_message="Dask rolling apply sample does not match pandas rolling apply sample.",
)
if self._progress_bar:
with TQDMDaskProgressBar(desc=self._progress_bar_desc or "Dask Apply"):
return self._obj_dd.apply(func, *args, **kwds).compute(scheduler=self._scheduler)
else:
return self._obj_dd.apply(func, *args, **kwds).compute(scheduler=self._scheduler)
except ERRORS_TO_HANDLE:
if self._progress_bar:
tqdm.pandas(desc=self._progress_bar_desc or "Pandas Apply")
return self._obj_pd.progress_apply(func, *args, **kwds)
else:
return self._obj_pd.apply(func, *args, **kwds)
class Resampler(Transformation):
def __init__(
self,
pandas_obj,
npartitions=None,
dask_threshold=1,
scheduler="processes",
progress_bar=True,
progress_bar_desc=None,
allow_dask_on_strings=False,
**kwds
):
super(Resampler, self).__init__(
pandas_obj, npartitions, dask_threshold, scheduler, progress_bar, progress_bar_desc, allow_dask_on_strings
)
self._resampler_kwds = kwds.copy()
self._comparison_pd = self._obj_pd.iloc[: self._npartitions * 2]