Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
filepath=str(tmp_path / "data.pkl"), backend="pickle"
)
catalog = DataCatalog(
data_sets={
"spark_in": spark_in,
"pickle": pickle_data,
"spark_out": spark_out,
}
)
pipeline = Pipeline(
[
node(identity, "spark_in", "pickle"),
node(identity, "pickle", "spark_out"),
]
)
runner = ParallelRunner()
pattern = r"{0} cannot be serialized. {1} can only be used with serializable data".format(
str(sample_spark_df.__class__), str(pickle_data.__class__.__name__)
)
with pytest.raises(DataSetError, match=pattern):
runner.run(pipeline, catalog)
def test_parallel_run_arg(self, dummy_context, dummy_dataframe, caplog):
dummy_context.catalog.save("cars", dummy_dataframe)
dummy_context.run(runner=ParallelRunner())
log_msgs = [record.getMessage() for record in caplog.records]
log_names = [record.name for record in caplog.records]
assert "kedro.runner.parallel_runner" in log_names
assert "Pipeline execution completed successfully." in log_msgs
def test_decorate_pipeline(self, fan_out_fan_in, catalog):
catalog.add_feed_dict(dict(A=42))
result = ParallelRunner().run(fan_out_fan_in.decorate(log_time), catalog)
assert "Z" in result
assert len(result["Z"]) == 3
assert result["Z"] == (42, 42, 42)
def test_release_transcoded(self):
runner = ParallelRunner()
log = runner._manager.list()
pipeline = Pipeline(
[node(source, None, "ds@save"), node(sink, "ds@load", None)]
)
catalog = DataCatalog(
{
"ds@save": LoggingDataSet(log, "save"),
"ds@load": LoggingDataSet(log, "load"),
}
)
ParallelRunner().run(pipeline, catalog)
# we want to see both datasets being released
assert list(log) == [("release", "save"), ("load", "load"), ("release", "load")]
def test_dont_release_inputs_and_outputs(self):
runner = ParallelRunner()
log = runner._manager.list()
pipeline = Pipeline(
[node(identity, "in", "middle"), node(identity, "middle", "out")]
)
catalog = DataCatalog(
{
"in": runner._manager.LoggingDataSet(log, "in", "stuff"),
"middle": runner._manager.LoggingDataSet(log, "middle"),
"out": runner._manager.LoggingDataSet(log, "out"),
}
)
ParallelRunner().run(pipeline, catalog)
# we don't want to see release in or out in here
assert list(log) == [("load", "in"), ("load", "middle"), ("release", "middle")]
def test_decorated_nodes(self, decorated_fan_out_fan_in, catalog):
catalog.add_feed_dict(dict(A=42))
result = ParallelRunner().run(decorated_fan_out_fan_in, catalog)
assert "Z" in result
assert len(result["Z"]) == 3
assert result["Z"] == (42, 42, 42)
def test_parallel_runner(self, spark_in, spark_out):
"""Test ParallelRunner with SparkDataSet load and save.
"""
catalog = DataCatalog(data_sets={"spark_in": spark_in, "spark_out": spark_out})
pipeline = Pipeline([node(identity, "spark_in", "spark_out")])
runner = ParallelRunner()
result = runner.run(pipeline, catalog)
# 'spark_out' is saved in 'tmp_path/input', so the result of run should be empty
assert not result
def test_release_transcoded(self):
runner = ParallelRunner()
log = runner._manager.list()
pipeline = Pipeline(
[node(source, None, "ds@save"), node(sink, "ds@load", None)]
)
catalog = DataCatalog(
{
"ds@save": LoggingDataSet(log, "save"),
"ds@load": LoggingDataSet(log, "load"),
}
)
ParallelRunner().run(pipeline, catalog)
# we want to see both datasets being released
assert list(log) == [("release", "save"), ("load", "load"), ("release", "load")]
assert not result.exit_code
fake_load_context.return_value.run.assert_called_once_with(
tags=(),
runner=mocker.ANY,
node_names=(),
from_nodes=[],
to_nodes=[],
from_inputs=[],
load_versions={},
pipeline_name=None,
)
assert isinstance(
fake_load_context.return_value.run.call_args_list[0][1]["runner"],
ParallelRunner,
)
def test_task_validation(self, fan_out_fan_in, catalog):
"""ParallelRunner cannot serialize the lambda function."""
catalog.add_feed_dict(dict(A=42))
pipeline = Pipeline([fan_out_fan_in, node(lambda x: x, "Z", "X")])
with pytest.raises(AttributeError):
ParallelRunner().run(pipeline, catalog)