Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def str_node_inputs_list():
return {
"nodes": [
node(biconcat, ["input1", "input2"], ["input3"], name="node1"),
node(identity, "input3", "input4", name="node2"),
],
"expected": [
{node(biconcat, ["input1", "input2"], ["input3"], name="node1")},
{node(identity, "input3", "input4", name="node2")},
],
"free_inputs": ["input1", "input2"],
"outputs": ["input4"],
}
def branchless_no_input_pipeline():
"""The pipeline runs in the order A->B->C->D->E."""
return Pipeline(
[
node(identity, "D", "E", name="node1"),
node(identity, "C", "D", name="node2"),
node(identity, "A", "B", name="node3"),
node(identity, "B", "C", name="node4"),
node(random, None, "A", name="node5"),
]
def test_tag_nodes(self):
tagged_node = node(identity, "input", "output", tags=["hello"]).tag(["world"])
assert "hello" in tagged_node.tags
assert "world" in tagged_node.tags
assert len(tagged_node.tags) == 2
def test_lambda(self):
n = node(lambda a: a, ["in"], ["out"])
assert str(n) == "([in]) -> [out]"
assert n.name == "([in]) -> [out]"
assert n.short_name == ""
def disjoint_pipeline():
# Two separate pipelines: A->B->C and D->E->F
return {
"nodes": [
node(identity, "A", "B", name="node1"),
node(identity, "B", "C", name="node2"),
node(identity, "E", "F", name="node3"), # disjoint part D->E->F
node(identity, "D", "E", name="node4"),
],
"expected": [
{
node(identity, "A", "B", name="node1"),
node(identity, "D", "E", name="node4"),
},
{
node(identity, "B", "C", name="node2"),
node(identity, "E", "F", name="node3"),
},
],
"free_inputs": ["A", "D"],
"outputs": ["C", "F"],
}
def test_node_equals(self):
first = node(identity, "input1", "output1", name="a node")
second = node(identity, "input1", "output1", name="a node")
assert first == second
assert first is not second
def nodes_with_tags():
return [
node(identity, "E", None, name="node1"),
node(identity, "D", "E", name="node2", tags=["tag1", "tag2"]),
node(identity, "C", "D", name="node3"),
node(identity, "A", "B", name="node4", tags=["tag2"]),
node(identity, "B", "C", name="node5"),
node(constant_output, None, "A", name="node6", tags=["tag1"]),
]
[
node(
treatment_fractions_,
["args_raw", "df_00"],
"treatment_fractions",
)
],
tags="131_treatment_fractions_",
),
Pipeline(
[node(fit_propensity, ["args", "df_00"], "propensity_model")],
tags="211_fit_propensity",
),
Pipeline(
[
node(
estimate_propensity,
["args", "df_00", "propensity_model"],
"df_01",
)
],
tags="221_estimate_propensity",
),
Pipeline(
[
node(
model_for_treated_fit, ["args", "df_01"], "treated__model_dict"
),
node(
model_for_untreated_fit,
["args", "df_01"],
"untreated__model_dict",
Returns:
Pipeline: The resulting pipeline.
"""
###########################################################################
# Here you can find an example pipeline with 4 nodes.
#
# PLEASE DELETE THIS PIPELINE ONCE YOU START WORKING ON YOUR OWN PROJECT AS
# WELL AS THE FILE nodes/example.py
# -------------------------------------------------------------------------
pipeline = Pipeline(
[
node(
split_data,
["example_iris_data", "parameters"],
dict(
train_x="example_train_x",
train_y="example_train_y",
test_x="example_test_x",
test_y="example_test_y",
),
),
node(
train_model,
["example_train_x", "example_train_y", "parameters"],
"example_model",
),
node(
predict,
],
tags="321_predict_proba",
),
Pipeline(
[
node(
compute_cate,
["treated__proba", "untreated__proba"],
"cate_estimated",
)
],
tags="411_compute_cate",
),
Pipeline(
[
node(
add_cate_to_df,
[
"args",
"df_01",
"cate_estimated",
"treated__proba",
"untreated__proba",
],
"df_02",
)
],
tags="421_add_cate_to_df",
),
Pipeline(
[
node(