Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_outputs_task_decorator(self):
with Flow("test"):
assert self.mult(x=1).outputs() == int
def test_pending_or_running_are_ok(self, state):
flow = Flow(name="test", tasks=[Task()])
new_state = FlowRunner(flow=flow).check_flow_is_pending_or_running(state=state)
assert new_state is state
def test_task_decorator_with_default_args_must_be_called_inside_flow_context(self):
@tasks.task
def fn(x=1):
return x
with pytest.raises(ValueError):
fn()
with Flow(name="test"):
assert isinstance(fn(), Task)
def test_task_runner_handler_that_doesnt_return_state_or_none(self):
# raises an attribute error because it tries to access a property of the state that
# doesn't exist on None
with pytest.raises(AttributeError):
with prefect.utilities.debug.raise_on_exception():
FlowRunner(
flow=Flow(name="test"), state_handlers=[lambda *a: True]
).run()
def test_running_stays_running(self):
state = Running()
flow = Flow(name="test", tasks=[Task()])
new_state = FlowRunner(flow=flow).set_flow_to_running(state=state)
assert new_state.is_running()
def test_set_downstream_context(self):
with Flow(name="test") as f:
t1 = Task()
t2 = Task()
t1.set_downstream(t2)
assert Edge(t1, t2) in f.edges
def test_dict_returns_a_dict(self):
l = collections.Dict()
with Flow(name="test") as f:
l.bind(keys=["a", "b"], values=[1, 2])
assert f.run().result[l].result == dict(a=1, b=2)
def test_copying_then_setting_tags_doesnt_leak_backwards():
with Flow(name="test"):
t1 = Task()
with tasks.tags("init-tag"):
t2 = t1.copy()
assert t2.tags == {"init-tag"}
assert t1.tags == set()
def test_tuple_binds_varargs(self):
t1 = Task()
t2 = Task()
l = collections.Tuple()
with Flow(name="test") as f:
l.bind(t1, t2)
assert set([t1, t2, l]) == f.tasks
assert Edge(t1, l, key="arg_1") in f.edges
assert Edge(t2, l, key="arg_2") in f.edges
def test_required_parameters_must_be_provided():
flow = Flow(name="test")
y = prefect.Parameter("y")
flow.add_task(y)
flow_state = FlowRunner(flow=flow).run(return_tasks=[y])
assert isinstance(flow_state, Failed)
assert isinstance(flow_state.result[y], Failed)
assert "required but not provided" in str(flow_state.result[y]).lower()