Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_names_only(self, str_node_inputs_list):
pipeline = Pipeline(str_node_inputs_list["nodes"])
description = pipeline.describe()
desc = description.split("\n")
test_desc = [
"#### Pipeline execution order ####",
"Name: None",
"Inputs: input1, input2",
"",
"node1",
"node2",
"",
"Outputs: input4",
"##################################",
]
assert len(desc) == len(test_desc)
def test_remove_from_empty_pipeline(self):
"""Remove node from an empty pipeline"""
pipeline1 = Pipeline([node(biconcat, ["input", "input1"], "output1", name="a")])
pipeline2 = Pipeline([])
new_pipeline = pipeline2 - pipeline1
assert new_pipeline.inputs() == pipeline2.inputs()
assert new_pipeline.outputs() == pipeline2.outputs()
assert not new_pipeline.nodes
def test_invalid_union(self):
p = Pipeline([])
pattern = r"unsupported operand type\(s\) for |: 'Pipeline' and 'str'"
with pytest.raises(TypeError, match=pattern):
p | "hello" # pylint: disable=pointless-statement
def test_tag_existing_pipeline(self, branchless_pipeline):
pipeline = Pipeline(branchless_pipeline["nodes"])
pipeline = pipeline.tag(["new_tag"])
assert all("new_tag" in n.tags for n in pipeline.nodes)
def test_memory_data_set_input(self, fan_out_fan_in):
pipeline = Pipeline([fan_out_fan_in])
catalog = DataCatalog({"A": MemoryDataSet("42")})
result = ParallelRunner().run(pipeline, catalog)
assert "Z" in result
assert len(result["Z"]) == 3
assert result["Z"] == ("42", "42", "42")
def decorated_fan_out_fan_in():
return Pipeline(
[
node(decorated_identity, "A", "B"),
node(decorated_identity, "B", "C"),
node(decorated_identity, "B", "D"),
node(decorated_identity, "B", "E"),
node(fan_in, ["C", "D", "E"], "Z"),
]
def test_initialized_with_tags(self):
pipeline = Pipeline(
[node(identity, "A", "B", tags=["node1", "p1"]), node(identity, "B", "C")],
tags=["p1", "p2"],
)
node1 = pipeline.grouped_nodes[0].pop()
node2 = pipeline.grouped_nodes[1].pop()
assert node1.tags == {"node1", "p1", "p2"}
assert node2.tags == {"p1", "p2"}
def _get_pipelines(self) -> Dict[str, Pipeline]:
return {"__default__": Pipeline([])}
def test_remove_all_nodes(self):
"""Remove an entire pipeline"""
pipeline1 = Pipeline([node(biconcat, ["input", "input1"], "output1", name="a")])
pipeline2 = Pipeline([node(biconcat, ["input", "input1"], "output1", name="a")])
new_pipeline = pipeline1 - pipeline2
assert new_pipeline.inputs() == set()
assert new_pipeline.outputs() == set()
assert not new_pipeline.nodes
def test_empty_apply(self):
"""Applying no decorators is valid."""
identity_node = node(identity, "number", "output", name="identity")
pipeline = Pipeline([identity_node]).decorate()
catalog = DataCatalog({}, dict(number=1))
result = SequentialRunner().run(pipeline, catalog)
assert result["output"] == 1