Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import pandas
from modin.engines.ray.pandas_on_ray.frame.partition import PandasOnRayFramePartition
from modin import __execution_engine__
if __execution_engine__ == "Ray":
import ray
import pyarrow
class PyarrowOnRayFramePartition(PandasOnRayFramePartition):
def to_pandas(self):
"""Convert the object stored in this partition to a Pandas DataFrame.
Returns:
A Pandas DataFrame.
"""
dataframe = self.get().to_pandas()
assert type(dataframe) is pandas.DataFrame or type(dataframe) is pandas.Series
return dataframe
@classmethod
def put(cls, obj):
"""Put an object in the Plasma store and wrap it in this object.
Args:
if len(call_queue_df) > 0:
for call, kwargs in call_queue_df:
df = call(df, **kwargs)
if len(call_queue_other) > 0:
for call, kwargs in call_queue_other:
other = call(other, **kwargs)
return map_func(df, other)
map_func = ray.put(map_func)
by_parts = np.squeeze(by)
if len(by_parts.shape) == 0:
by_parts = np.array([by_parts.item()])
new_partitions = np.array(
[
[
PandasOnRayFramePartition(
func.remote(
part.oid,
by_parts[col_idx].oid if axis else by_parts[row_idx].oid,
map_func,
part.call_queue,
by_parts[col_idx].call_queue
if axis
else by_parts[row_idx].call_queue,
)
)
for col_idx, part in enumerate(partitions[row_idx])
]
for row_idx in range(len(partitions))
]
)
return cls.map_axis_partitions(axis, new_partitions, reduce_func)
def add_to_apply_calls(self, func, **kwargs):
return PandasOnRayFramePartition(
self.oid, call_queue=self.call_queue + [(func, kwargs)]
)
def __copy__(self):
return PandasOnRayFramePartition(
self.oid, self._length_cache, self._width_cache
)
def put(cls, obj):
"""Put an object in the Plasma store and wrap it in this object.
Args:
obj: The object to be put.
Returns:
A `RayRemotePartition` object.
"""
return PandasOnRayFramePartition(ray.put(obj), len(obj.index), len(obj.columns))
"""Apply a function to the object stored in this partition.
Note: It does not matter if func is callable or an ObjectID. Ray will
handle it correctly either way. The keyword arguments are sent as a
dictionary.
Args:
func: The function to apply.
Returns:
A RayRemotePartition object.
"""
oid = self.oid
call_queue = self.call_queue + [(func, kwargs)]
result, length, width = deploy_ray_func.remote(call_queue, oid)
return PandasOnRayFramePartition(result, length, width)
start,
end,
num_splits,
query,
con,
index_col,
coerce_float,
params,
parse_dates,
columns,
chunksize,
),
num_return_vals=num_splits + 1,
)
partition_ids.append(
[PandasOnRayFramePartition(obj) for obj in partition_id[:-1]]
)
index_ids.append(partition_id[-1])
new_index = pandas.RangeIndex(sum(ray.get(index_ids)))
return cls.query_compiler_cls(
cls.frame_cls(np.array(partition_ids), new_index, cols_names)
)