Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def to_numpy(self):
"""Converts Modin DataFrame to NumPy Array.
Returns:
NumPy Array of the QueryCompiler.
"""
arr = self._modin_frame.to_numpy()
ErrorMessage.catch_bugs_and_request_email(
len(arr) != len(self.index) or len(arr[0]) != len(self.columns)
)
return arr
Note: These are the global indices of the object. This is mostly useful
when you have deleted rows/columns internally, but do not know
which ones were deleted.
Args:
axis: This axis to extract the labels. (0 - index, 1 - columns).
index_func: The function to be used to extract the function.
old_blocks: An optional previous object that this object was
created from. This is used to compute the correct offsets.
Returns:
A Pandas Index object.
"""
client = _get_global_client()
ErrorMessage.catch_bugs_and_request_email(not callable(index_func))
func = cls.preprocess_func(index_func)
if axis == 0:
# We grab the first column of blocks and extract the indices
new_idx = (
[idx.apply(func).future for idx in partitions.T[0]]
if len(partitions.T)
else []
)
else:
new_idx = (
[idx.apply(func).future for idx in partitions[0]]
if len(partitions)
else []
)
new_idx = client.gather(new_idx)
return new_idx[0].append(new_idx[1:]) if len(new_idx) else new_idx
@property
def _index_grouped(self):
if self._index_grouped_cache is None:
if self._is_multi_by or isinstance(self._by, pandas.Grouper):
# Because we are doing a collect (to_pandas) here and then groupby, we
# end up using pandas implementation. Add the warning so the user is
# aware.
ErrorMessage.catch_bugs_and_request_email(self._axis == 1)
ErrorMessage.default_to_pandas(
"Groupby with multiple columns or Grouper object"
)
self._index_grouped_cache = {
k: v.index
for k, v in self._df._query_compiler.to_pandas().groupby(
by=self._by
)
}
else:
if self._axis == 0:
self._index_grouped_cache = self._index.groupby(self._by)
else:
self._index_grouped_cache = self._columns.groupby(self._by)
return self._index_grouped_cache
"""This gets the internal indices stored in the partitions.
Note: These are the global indices of the object. This is mostly useful
when you have deleted rows/columns internally, but do not know
which ones were deleted.
Args:
axis: This axis to extract the labels. (0 - index, 1 - columns).
index_func: The function to be used to extract the function.
old_blocks: An optional previous object that this object was
created from. This is used to compute the correct offsets.
Returns:
A Pandas Index object.
"""
ErrorMessage.catch_bugs_and_request_email(not callable(index_func))
func = cls.preprocess_func(index_func)
if axis == 0:
# We grab the first column of blocks and extract the indices
new_idx = (
[idx.apply(func).oid for idx in partitions.T[0]]
if len(partitions.T)
else []
)
else:
new_idx = (
[idx.apply(func).oid for idx in partitions[0]]
if len(partitions)
else []
)
new_idx = ray.get(new_idx)
return new_idx[0].append(new_idx[1:]) if len(new_idx) else new_idx
Returns:
A Pandas DataFrame
"""
retrieved_objects = [[obj.to_pandas() for obj in part] for part in partitions]
if all(
isinstance(part, pandas.Series) for row in retrieved_objects for part in row
):
axis = 0
elif all(
isinstance(part, pandas.DataFrame)
for row in retrieved_objects
for part in row
):
axis = 1
else:
ErrorMessage.catch_bugs_and_request_email(True)
df_rows = [
pandas.concat([part for part in row], axis=axis)
for row in retrieved_objects
if not all(part.empty for part in row)
]
if len(df_rows) == 0:
return pandas.DataFrame()
else:
return pandas.concat(df_rows)
def get_indices(cls, axis, partitions, index_func=None):
"""This gets the internal indices stored in the partitions.
Note: These are the global indices of the object. This is mostly useful
when you have deleted rows/columns internally, but do not know
which ones were deleted.
Args:
axis: This axis to extract the labels. (0 - index, 1 - columns).
index_func: The function to be used to extract the function.
Returns:
A Pandas Index object.
"""
ErrorMessage.catch_bugs_and_request_email(not callable(index_func))
func = cls.preprocess_func(index_func)
if axis == 0:
new_idx = (
[idx.apply(func).get() for idx in partitions.T[0]]
if len(partitions.T)
else []
)
else:
new_idx = (
[idx.apply(func).get() for idx in partitions[0]]
if len(partitions)
else []
)
# TODO FIX INFORMATION LEAK!!!!1!!1!!
return new_idx[0].append(new_idx[1:]) if len(new_idx) else new_idx
def _index_grouped(self):
if self._index_grouped_cache is None:
if self._is_multi_by:
# Because we are doing a collect (to_pandas) here and then groupby, we
# end up using pandas implementation. Add the warning so the user is
# aware.
ErrorMessage.catch_bugs_and_request_email(self._axis == 1)
ErrorMessage.default_to_pandas("Groupby with multiple columns")
self._index_grouped_cache = {
k: v.index
for k, v in self._df._query_compiler.getitem_column_array(self._by)
.to_pandas()
.groupby(by=self._by)
}
else:
if isinstance(self._by, type(self._query_compiler)):
by = self._by.to_pandas().squeeze()
else:
by = self._by
if self._axis == 0:
self._index_grouped_cache = self._index.groupby(by)
else:
self._index_grouped_cache = self._columns.groupby(by)
self._partitions = np.array(
[
[
self._partitions[i][j].add_to_apply_calls(
apply_idx_objs,
cols=self.columns[
slice(cum_col_widths[j], cum_col_widths[j + 1])
],
)
for j in range(len(self._partitions[i]))
]
for i in range(len(self._partitions))
]
)
ErrorMessage.catch_bugs_and_request_email(
axis is not None and axis not in [0, 1]
)