Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_dask_dataframe(self):
with LocalCUDACluster() as cluster:
with Client(cluster) as client:
X, y = generate_array()
X = dd.from_dask_array(X)
y = dd.from_dask_array(y)
X = X.map_partitions(cudf.from_pandas)
y = y.map_partitions(cudf.from_pandas)
dtrain = dxgb.DaskDMatrix(client, X, y)
out = dxgb.train(client, {'tree_method': 'gpu_hist'},
dtrain=dtrain,
evals=[(dtrain, 'X')],
num_boost_round=2)
assert isinstance(out['booster'], dxgb.Booster)
assert len(out['history']['X']['rmse']) == 2
predictions = dxgb.predict(client, out, dtrain).compute()
assert isinstance(predictions, np.ndarray)
def test_example(query, expected, model):
if model == 'dask':
sc = {k: dd.from_pandas(df, npartitions=3) for k, df in scope.items()}
actual = fq.execute(query, scope=sc, model=model)
actual = actual.compute()
else:
actual = fq.execute(query, scope=scope, model=model)
expected = expected()
# set empty columns in expected to the ones in actual
expected.columns = [e or a for a, e in zip(actual.columns, expected.columns)]
actual = actual.reset_index(drop=True)
expected = actual.reset_index(drop=True)
pdt.assert_frame_equal(actual, expected, check_dtype=False)
def to_pandas(df):
return df.to_pandas()
c = default_client() if client is None else client
delayed_ddf = dask_cudf.to_delayed()
gpu_futures = c.compute(delayed_ddf)
key = uuid1()
dfs = [c.submit(
to_pandas,
f,
key="%s-%s" % (key, idx)) for idx, f in enumerate(gpu_futures)]
meta = c.submit(get_meta, dfs[0]).result()
return dd.from_delayed(dfs, meta=meta)
def transform(self, raw_X):
msg = "'X' should be a 1-dimensional array with length 'num_samples'."
if not dask.is_dask_collection(raw_X):
return self._hasher(**self.get_params()).transform(raw_X)
if isinstance(raw_X, db.Bag):
bag2 = raw_X.map_partitions(self._transformer)
objs = bag2.to_delayed()
arrs = [
da.from_delayed(obj, (np.nan, self.n_features), self.dtype)
for obj in objs
]
result = da.concatenate(arrs, axis=0)
elif isinstance(raw_X, dd.Series):
result = raw_X.map_partitions(self._transformer)
elif isinstance(raw_X, da.Array):
# dask.Array
chunks = ((np.nan,) * raw_X.numblocks[0], (self.n_features,))
if raw_X.ndim == 1:
result = raw_X.map_blocks(
self._transformer, dtype="f8", chunks=chunks, new_axis=1
)
else:
raise ValueError(msg)
else:
raise ValueError(msg)
meta = scipy.sparse.eye(0, format="csr")
result._meta = meta
return result
objpath = splitext(basename(data_path))[0]
self.df = pd.read_hdf(data_path, objpath)
# parse categorical fields
for f in self.categorical_fields:
self.df[f] = self.df[f].astype('category')
elif data_path.endswith(".parq"):
import dask.dataframe as dd
self.df = dd.io.parquet.read_parquet(data_path)
if not outofcore:
self.df = self.df.persist()
elif data_path.endswith(".castra"):
import dask.dataframe as dd
self.df = dd.from_castra(data_path)
if not outofcore:
self.df = self.df.cache(cache=dict)
else:
raise IOError("Unknown data file type; .csv and .castra currently supported")
def batch_inference_on_partition(partition_df):
churn_model_file = f'/opt/models/{DAY_AS_STR}_{UNIQUE_HASH}_churn_model.h5'
churn_model = load_model(churn_model_file)
prediction = churn_model.predict(partition_df.drop(['client_id'], axis=1))[0][0]
return prediction
def persist_partition(partition_df):
def persist_one_prediction(series_obj):
cassandra.save_prediction(series_obj.client_id, series_obj.prediction)
cassandra = Cassandra()
partition_df.apply(persist_one_prediction, axis=1)
return 0
if __name__ == '__main__':
client = Client()
dask_df = client.persist(dd.read_parquet(HDFS_DIR_INPUT))
dask_df.client_id.count().compute()
dask_df['prediction'] = dask_df.map_partitions(batch_inference_on_partition, meta=('prediction', float))
dask_df['token'] = dask_df.map_partitions(persist_partition, meta=('token', int))
dask_df.token.compute()
-------
y : array-like
"""
X = self._check_array(X)
self._check_method("predict_proba")
if isinstance(X, da.Array):
# XXX: multiclass
return X.map_blocks(
_predict_proba,
estimator=self._postfit_estimator,
dtype="float",
chunks=(X.chunks[0], len(self._postfit_estimator.classes_)),
)
elif isinstance(X, dd._Frame):
return X.map_partitions(_predict_proba, estimator=self._postfit_estimator)
else:
return _predict_proba(X, estimator=self._postfit_estimator)
def main():
client = Client()
dask_df = client.persist(dd.read_parquet(HDFS_DIR_INPUT))
ModelGenerator(dask_df).generate_and_save_model()
# Read parquet file
frame = dd.read_parquet(path)
# Check for spatial points metadata
if 'SpatialPointsFrame' in pf.key_value_metadata:
# Load metadata
props = json.loads(pf.key_value_metadata['SpatialPointsFrame'])
else:
props = None
# Call DataFrame constructor with the internals of frame
return SpatialPointsFrame(frame.dask, frame._name, frame._meta,
frame.divisions, props)
class SpatialPointsFrame(dd.DataFrame):
"""
Class that wraps a spatially partitioned parquet data set and provides
a spatial_query method to access the subset of partitions necessary to
cover the specified x/y range extents.
The spatially partitioned parquet data set must first be created using
the `SpatialPointsFrame.partition_and_write` static method. Instances
of the SpatialPointsFrame class can then be constructed with this
partitioned parquet data set.
Note that this class is only suitable for partitioning data sets for use
with the Canvas.points aggregation method.
Examples
--------
First, construct a spatially partitioned parquet file. This is an expensive
y[key]`
batch_size: size to split data into parts. Must be >= 1.
shuffle: Whether to shuffle the inputs.
epochs: Number of epochs to run.
Returns:
DataFeeder object that returns training data.
Raises:
ValueError: if one of `x` and `y` is iterable and the other is not.
"""
x, y = _data_type_filter(x, y)
if HAS_DASK:
# pylint: disable=g-import-not-at-top
import dask.dataframe as dd
if (isinstance(x, (dd.Series, dd.DataFrame)) and
(y is None or isinstance(y, (dd.Series, dd.DataFrame)))):
data_feeder_cls = DaskDataFeeder
else:
data_feeder_cls = DataFeeder
else:
data_feeder_cls = DataFeeder
if _is_iterable(x):
if y is not None and not _is_iterable(y):
raise ValueError('Both x and y should be iterators for '
'streaming learning to work.')
return StreamingDataFeeder(x, y, n_classes, batch_size)
return data_feeder_cls(
x, y, n_classes, batch_size, shuffle=shuffle, epochs=epochs)