Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def closure(df_grouped, func, *args, **kwargs):
groups = list(df_grouped.groups.items())
chunks = _chunk(len(groups), nb_workers)
object_id = plasma_client.put(df_grouped.obj)
groups_id = plasma_client.put(groups)
with _ProcessPoolExecutor(max_workers=nb_workers) as executor:
futures = [
executor.submit(_DataFrameGroupBy.worker,
plasma_store_name, object_id,
groups_id, chunk, func, *args, **kwargs)
for chunk in chunks
]
result = _pd.DataFrame(list(itertools.chain.from_iterable([
plasma_client.get(future.result())
for future in futures
])),
index=_pd.Series(list(df_grouped.grouper),
name=df_grouped.keys)
).squeeze()
return result
return closure