Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_unsatisfied_input():
pipeline = Pipeline([Node(lambda a: None, ["a"], None)])
catalog = DataCatalog()
pattern = r"Pipeline input\(s\) \{\'a\'\} not found in the DataCatalog"
with pytest.raises(ValueError, match=pattern):
AirflowRunner(None, None, {}).run(pipeline, catalog)
def test_parallel_runner_with_pickle_dataset(
self, tmp_path, spark_in, spark_out, sample_spark_df
):
"""Test ParallelRunner with SparkDataSet -> PickleDataSet -> SparkDataSet .
"""
pickle_data = PickleLocalDataSet(
filepath=str(tmp_path / "data.pkl"), backend="pickle"
)
catalog = DataCatalog(
data_sets={
"spark_in": spark_in,
"pickle": pickle_data,
"spark_out": spark_out,
}
)
pipeline = Pipeline(
[
node(identity, "spark_in", "pickle"),
node(identity, "pickle", "spark_out"),
]
)
runner = ParallelRunner()
pattern = r"{0} cannot be serialized. {1} can only be used with serializable data".format(
str(sample_spark_df.__class__), str(pickle_data.__class__.__name__)
def test_release_transcoded(self):
runner = ParallelRunner()
log = runner._manager.list()
pipeline = Pipeline(
[node(source, None, "ds@save"), node(sink, "ds@load", None)]
)
catalog = DataCatalog(
{
"ds@save": LoggingDataSet(log, "save"),
"ds@load": LoggingDataSet(log, "load"),
}
)
ParallelRunner().run(pipeline, catalog)
# we want to see both datasets being released
assert list(log) == [("release", "save"), ("load", "load"), ("release", "load")]
def test_no_default_datasets():
pipeline = Pipeline([Node(lambda: None, [], "fred")])
catalog = DataCatalog()
with pytest.raises(ValueError, match="'fred' is not registered"):
AirflowRunner(None, None, {}).run(pipeline, catalog)
def memory_catalog():
ds1 = MemoryDataSet({"data": 42})
ds2 = MemoryDataSet([1, 2, 3, 4, 5])
return DataCatalog({"ds1": ds1, "ds2": ds2})
def data_catalog(data_set):
return DataCatalog(data_sets={"test": data_set})
def test_count_multiple_loads(self):
runner = ParallelRunner()
log = runner._manager.list()
pipeline = Pipeline(
[
node(source, None, "dataset"),
node(sink, "dataset", None, name="bob"),
node(sink, "dataset", None, name="fred"),
]
)
catalog = DataCatalog(
{"dataset": runner._manager.LoggingDataSet(log, "dataset")}
)
runner.run(pipeline, catalog)
# we want to the release after both the loads
assert list(log) == [
("load", "dataset"),
("load", "dataset"),
("release", "dataset"),
]
def test_add_all_save_and_load(self, data_set, dummy_dataframe):
"""Test adding all to the data catalog and then saving and reloading
the data set"""
catalog = DataCatalog(data_sets={})
catalog.add_all({"test": data_set})
catalog.save("test", dummy_dataframe)
reloaded_df = catalog.load("test")
assert_frame_equal(reloaded_df, dummy_dataframe)
def test_save_to_unregistered(self, dummy_dataframe):
"""Check the error when attempting to save to unregistered data set"""
catalog = DataCatalog(data_sets={})
pattern = r"DataSet 'test' not found in the catalog"
with pytest.raises(DataSetNotFoundError, match=pattern):
catalog.save("test", dummy_dataframe)
def test_node_returning_none(self, saving_none_pipeline):
pattern = "Saving `None` to a `DataSet` is not allowed"
with pytest.raises(DataSetError, match=pattern):
SequentialRunner().run(saving_none_pipeline, DataCatalog())