Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@parallel(plasma_client)
def closure(data, func, **kwargs):
pool = ProcessingPool(nb_workers)
manager = Manager()
queue = manager.Queue()
ProgressBars = (ProgressBarsNotebookLab if in_notebook_lab
else ProgressBarsConsole)
chunks = chunk(data.size, nb_workers)
maxs = [chunk.stop - chunk.start for chunk in chunks]
values = [0] * nb_workers
finished = [False] * nb_workers
if display_progress_bar:
progress_bar = ProgressBars(maxs)
@parallel(plasma_client)
def closure(rolling_groupby, func, *args, **kwargs):
groups = list(rolling_groupby._groupby.groups.items())
chunks = chunk(len(groups), nb_workers)
object_id = plasma_client.put(rolling_groupby.obj)
groups_id = plasma_client.put(groups)
attribute2value = {
attribute: getattr(rolling_groupby, attribute)
for attribute in rolling_groupby._attributes
}
worker_args = [
(
plasma_store_name,
object_id,
groups_id,
@parallel(plasma_client)
def closure(rolling, func, *args, **kwargs):
pool = ProcessingPool(nb_workers)
manager = Manager()
queue = manager.Queue()
ProgressBars = (
ProgressBarsNotebookLab if in_notebook_lab else ProgressBarsConsole
)
series = rolling.obj
window = rolling.window
chunks = chunk(len(series), nb_workers, window)
maxs = [chunk.stop - chunk.start for chunk in chunks]
values = [0] * nb_workers
finished = [False] * nb_workers
@parallel(plasma_client)
def closure(df, func, *args, **kwargs):
pool = ProcessingPool(nb_workers)
manager = Manager()
queue = manager.Queue()
ProgressBars = (
ProgressBarsNotebookLab if in_notebook_lab else ProgressBarsConsole
)
axis = kwargs.get("axis", 0)
if axis == "index":
axis = 0
elif axis == "columns":
axis = 1
opposite_axis = 1 - axis
@parallel(plasma_client)
def closure(df_grouped, func, *args, **kwargs):
groups = list(df_grouped.groups.items())
chunks = chunk(len(groups), nb_workers)
object_id = plasma_client.put(df_grouped.obj)
groups_id = plasma_client.put(groups)
workers_args = [
(plasma_store_name, object_id, groups_id, chunk, func, args, kwargs)
for chunk in chunks
]
with ProcessingPool(nb_workers) as pool:
result_workers = pool.map(DataFrameGroupBy.worker, workers_args)
if len(df_grouped.grouper.shape) == 1:
# One element in "by" argument