Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_unsatisfied_inputs(self, unfinished_outputs_pipeline):
"""ds1, ds2 and ds3 were not specified."""
with pytest.raises(ValueError, match=r"not found in the DataCatalog"):
SequentialRunner().run(unfinished_outputs_pipeline, DataCatalog())
def test_no_input_seq(self, branchless_no_input_pipeline):
outputs = SequentialRunner().run(branchless_no_input_pipeline, DataCatalog())
assert "E" in outputs
assert len(outputs) == 1
def test_sequential_run_arg(self, dummy_context, dummy_dataframe, caplog):
dummy_context.catalog.save("cars", dummy_dataframe)
dummy_context.run(runner=SequentialRunner())
log_msgs = [record.getMessage() for record in caplog.records]
log_names = [record.name for record in caplog.records]
assert "kedro.runner.sequential_runner" in log_names
assert "Pipeline execution completed successfully." in log_msgs
def test_input_seq(
self, memory_catalog, unfinished_outputs_pipeline, pandas_df_feed_dict
):
memory_catalog.add_feed_dict(pandas_df_feed_dict, replace=True)
outputs = SequentialRunner().run(unfinished_outputs_pipeline, memory_catalog)
assert set(outputs.keys()) == {"ds8", "ds5", "ds6"}
# the pipeline runs ds2->ds5
assert outputs["ds5"] == [1, 2, 3, 4, 5]
assert isinstance(outputs["ds8"], dict)
# the pipeline runs ds1->ds4->ds8
assert outputs["ds8"]["data"] == 42
# the pipline runs ds3
assert isinstance(outputs["ds6"], pd.DataFrame)
def _from_missing(pipeline, catalog):
"""Create a new pipeline based on missing outputs."""
name = "kedro.runner.runner.AbstractRunner.run"
with mock.patch(name) as run:
SequentialRunner().run_only_missing(pipeline, catalog)
_, args, _ = run.mock_calls[0]
new_pipeline = args[0]
return new_pipeline
assert not result.exit_code
fake_load_context.return_value.run.assert_called_once_with(
tags=(),
runner=mocker.ANY,
node_names=(),
from_nodes=[],
to_nodes=[],
from_inputs=[],
load_versions={},
pipeline_name=None,
)
assert isinstance(
fake_load_context.return_value.run.call_args_list[0][1]["runner"],
SequentialRunner,
)
def test_empty_apply(self):
"""Applying no decorators is valid."""
identity_node = node(identity, "number", "output", name="identity")
pipeline = Pipeline([identity_node]).decorate()
catalog = DataCatalog({}, dict(number=1))
result = SequentialRunner().run(pipeline, catalog)
assert result["output"] == 1
from_nodes,
from_inputs,
load_version,
pipeline,
config,
params,
):
"""Run the pipeline."""
if parallel and runner:
raise KedroCliError(
"Both --parallel and --runner options cannot be used together. "
"Please use either --parallel or --runner."
)
if parallel:
runner = "ParallelRunner"
runner_class = load_obj(runner, "kedro.runner") if runner else SequentialRunner
context = load_context(Path.cwd(), env=env, extra_params=params)
context.run(
tags=tag,
runner=runner_class(),
node_names=node_names,
from_nodes=from_nodes,
to_nodes=to_nodes,
from_inputs=from_inputs,
load_versions=load_version,
pipeline_name=pipeline,
)
from_nodes,
from_inputs,
load_version,
pipeline,
config,
params,
):
"""Run the pipeline."""
if parallel and runner:
raise KedroCliError(
"Both --parallel and --runner options cannot be used together. "
"Please use either --parallel or --runner."
)
if parallel:
runner = "ParallelRunner"
runner_class = load_obj(runner, "kedro.runner") if runner else SequentialRunner
tag = _get_values_as_tuple(tag) if tag else tag
node_names = _get_values_as_tuple(node_names) if node_names else node_names
context = load_context(Path.cwd(), env=env, extra_params=params)
context.run(
tags=tag,
runner=runner_class(),
node_names=node_names,
from_nodes=from_nodes,
to_nodes=to_nodes,
from_inputs=from_inputs,
load_versions=load_version,
pipeline_name=pipeline,
)