Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _determine_engine(cls):
factory_name = "Experimental{}On{}Factory".format(
partition_format, execution_engine
)
return getattr(sys.modules[__name__], factory_name)
from .partition import PandasOnRayFramePartition
from modin import __execution_engine__
if __execution_engine__ == "Ray":
import ray
class PandasOnRayFrameAxisPartition(PandasFrameAxisPartition):
def __init__(self, list_of_blocks):
# Unwrap from BaseFramePartition object for ease of use
for obj in list_of_blocks:
obj.drain_call_queue()
self.list_of_blocks = [obj.oid for obj in list_of_blocks]
partition_type = PandasOnRayFramePartition
if __execution_engine__ == "Ray":
instance_type = ray.ObjectID
@classmethod
def deploy_axis_func(
cls, axis, func, num_splits, kwargs, maintain_partitioning, *partitions
):
return deploy_ray_func._remote(
args=(
PandasFrameAxisPartition.deploy_axis_func,
axis,
func,
num_splits,
kwargs,
maintain_partitioning,
)
+ tuple(partitions),
import pandas
from modin.backends.pandas.query_compiler import PandasQueryCompiler
from modin.engines.ray.generic.io import RayIO
from modin.engines.base.io import CSVReader, JSONReader
from modin.backends.pandas.parsers import (
PandasCSVParser,
PandasJSONParser,
_split_result_for_readers,
)
from modin.engines.ray.task_wrapper import RayTask
from modin.engines.ray.pandas_on_ray.frame.partition import PandasOnRayFramePartition
from modin.engines.ray.pandas_on_ray.frame.data import PandasOnRayFrame
from modin import __execution_engine__
if __execution_engine__ == "Ray":
import ray
@ray.remote
def _read_parquet_columns(path, columns, num_splits, kwargs): # pragma: no cover
"""Use a Ray task to read columns from Parquet into a Pandas DataFrame.
Note: Ray functions are not detected by codecov (thus pragma: no cover)
Args:
path: The path of the Parquet file.
columns: The list of column names to read.
num_splits: The number of partitions to split the column into.
Returns:
A list containing the split Pandas DataFrames and the Index as the last
element. If there is not `index_col` set, then we just return the length.
# - typing.py
# - site-packages/
# - pandas
# So extracting the dirname of the site_packages can point us
# to the directory containing standard libraries.
sys.path.insert(
site_packages_path_index, os.path.dirname(site_packages_path)
)
move_stdlib_ahead_of_site_packages()
ray.worker.global_worker.run_function_on_all_workers(
move_stdlib_ahead_of_site_packages
)
if execution_engine == "Ray":
import ray
initialize_ray()
num_cpus = ray.cluster_resources()["CPU"]
elif execution_engine == "Dask": # pragma: no cover
from distributed.client import _get_global_client
import warnings
warnings.warn("The Dask Engine for Modin is experimental.")
if threading.current_thread().name == "MainThread":
# initialize the dask client
client = _get_global_client()
if client is None:
from distributed import Client
elif execution_engine == "Dask": # pragma: no cover
from distributed.client import _get_global_client
import warnings
warnings.warn("The Dask Engine for Modin is experimental.")
if threading.current_thread().name == "MainThread":
# initialize the dask client
client = _get_global_client()
if client is None:
from distributed import Client
client = Client()
num_cpus = sum(client.ncores().values())
elif execution_engine != "Python":
raise ImportError("Unrecognized execution engine: {}.".format(execution_engine))
DEFAULT_NPARTITIONS = max(4, int(num_cpus))
__all__ = [
"DataFrame",
"Series",
"read_csv",
"read_parquet",
"read_json",
"read_html",
"read_clipboard",
"read_excel",
"read_hdf",
"read_feather",
"read_msgpack",
"read_stata",
def _determine_engine(cls):
factory_name = "Experimental{}On{}Factory".format(
partition_format, execution_engine
)
return getattr(sys.modules[__name__], factory_name)
default Index.
"""
import pyarrow.parquet as pq
df = (
pq.ParquetDataset(path, **kwargs)
.read(columns=columns, use_pandas_metadata=True)
.to_pandas()
)
df = df[columns]
# Append the length of the index here to build it externally
return _split_result_for_readers(0, num_splits, df) + [len(df.index)]
class ExperimentalPandasOnRayIO(PandasOnRayIO):
if __execution_engine__ == "Ray":
read_parquet_remote_task = _read_parquet_columns
@classmethod
def read_sql(
cls,
sql,
con,
index_col=None,
coerce_float=True,
params=None,
parse_dates=None,
columns=None,
chunksize=None,
partition_column=None,
lower_bound=None,
upper_bound=None,
from modin.engines.base.frame.data import BasePandasFrame
from .partition_manager import DaskFrameManager
from modin import __execution_engine__
if __execution_engine__ == "Dask":
from distributed.client import _get_global_client
class PandasOnDaskFrame(BasePandasFrame):
_frame_mgr_cls = DaskFrameManager
@property
def _row_lengths(self):
"""Compute the row lengths if they are not cached.
Returns:
A list of row lengths.
"""
client = _get_global_client()
if self._row_lengths_cache is None:
from modin.engines.base.frame.axis_partition import PandasFrameAxisPartition
from .partition import PandasOnDaskFramePartition
from modin import __execution_engine__
if __execution_engine__ == "Dask":
from distributed.client import _get_global_client
from distributed import Future
class PandasOnDaskFrameAxisPartition(PandasFrameAxisPartition):
def __init__(self, list_of_blocks):
# Unwrap from BaseFramePartition object for ease of use
for obj in list_of_blocks:
obj.drain_call_queue()
self.list_of_blocks = [obj.future for obj in list_of_blocks]
partition_type = PandasOnDaskFramePartition
if __execution_engine__ == "Dask":
instance_type = Future
@classmethod
# - typing.py
# - site-packages/
# - pandas
# So extracting the dirname of the site_packages can point us
# to the directory containing standard libraries.
sys.path.insert(
site_packages_path_index, os.path.dirname(site_packages_path)
)
move_stdlib_ahead_of_site_packages()
ray.worker.global_worker.run_function_on_all_workers(
move_stdlib_ahead_of_site_packages
)
if execution_engine == "Ray":
import ray
initialize_ray()
num_cpus = ray.cluster_resources()["CPU"]
elif execution_engine == "Dask": # pragma: no cover
from distributed.client import _get_global_client
import warnings
warnings.warn("The Dask Engine for Modin is experimental.")
if threading.current_thread().name == "MainThread":
# initialize the dask client
client = _get_global_client()
if client is None:
from distributed import Client