Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
est_apply_duration = sample_proc_est / self._SAMPLE_SIZE * self._obj.shape[0]
# if pandas sample apply takes too long and not performing str processing, use dask
if (est_apply_duration > self._dask_threshold) and allow_dask_processing:
return self._dask_applymap(func)
else: # use pandas
if self._progress_bar:
tqdm.pandas(desc=self._progress_bar_desc or "Pandas Apply")
applymap_func = self._obj.progress_applymap
else:
applymap_func = self._obj.applymap
return applymap_func(func)
class Transformation(_SwifterObject):
def __init__(
self,
pandas_obj,
npartitions=None,
dask_threshold=1,
scheduler="processes",
progress_bar=True,
progress_bar_desc=None,
allow_dask_on_strings=False,
):
super(Transformation, self).__init__(
pandas_obj, npartitions, dask_threshold, scheduler, progress_bar, progress_bar_desc, allow_dask_on_strings
)
self._sample_pd = pandas_obj.iloc[: self._SAMPLE_SIZE]
self._obj_pd = pandas_obj
self._obj_dd = dd.from_pandas(pandas_obj, npartitions=npartitions)
sample_proc_est = timed / N_REPEATS
est_apply_duration = sample_proc_est / self._SAMPLE_SIZE * self._obj.shape[0]
# if pandas sample apply takes too long and not performing str processing, use dask
if (est_apply_duration > self._dask_threshold) and allow_dask_processing:
return self._dask_apply(func, convert_dtype, *args, **kwds)
else: # use pandas
if self._progress_bar:
tqdm.pandas(desc=self._progress_bar_desc or "Pandas Apply")
return self._obj.progress_apply(func, convert_dtype=convert_dtype, args=args, **kwds)
else:
return self._obj.apply(func, convert_dtype=convert_dtype, args=args, **kwds)
@pd.api.extensions.register_dataframe_accessor("swifter")
class DataFrameAccessor(_SwifterObject):
def _wrapped_apply(self, func, axis=0, raw=None, result_type=None, args=(), **kwds):
def wrapped():
with suppress_stdout_stderr():
self._obj.iloc[: self._SAMPLE_SIZE, :].apply(
func, axis=axis, raw=raw, result_type=result_type, args=args, **kwds
)
return wrapped
def _dask_apply(self, func, axis=0, raw=None, result_type=None, *args, **kwds):
sample = self._obj.iloc[: self._npartitions * 2, :]
with suppress_stdout_stderr():
meta = sample.apply(func, axis=axis, raw=raw, result_type=result_type, args=args, **kwds)
try:
with suppress_stdout_stderr():
# check that the dask apply matches the pandas apply
"on": on,
"level": level,
}
return Resampler(
self._obj,
self._npartitions,
self._dask_threshold,
self._scheduler,
self._progress_bar,
self._progress_bar_desc,
**kwds
)
@pd.api.extensions.register_series_accessor("swifter")
class SeriesAccessor(_SwifterObject):
def _wrapped_apply(self, func, convert_dtype=True, args=(), **kwds):
def wrapped():
with suppress_stdout_stderr():
self._obj.iloc[: self._SAMPLE_SIZE].apply(func, convert_dtype=convert_dtype, args=args, **kwds)
return wrapped
def _dask_apply(self, func, convert_dtype, *args, **kwds):
sample = self._obj.iloc[: self._npartitions * 2]
with suppress_stdout_stderr():
meta = sample.apply(func, convert_dtype=convert_dtype, args=args, **kwds)
try:
# check that the dask map partitions matches the pandas apply
with suppress_stdout_stderr():
tmp_df = (
dd.from_pandas(sample, npartitions=self._npartitions)