Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_non_failed_states(self, state):
new_state = TaskRunner(task=Task()).check_for_retry(state=state, inputs={})
assert new_state is state
def test_mapped_tasks_parents_and_children_respond_to_individual_triggers():
task_runner_handler = MagicMock(side_effect=lambda t, o, n: n)
runner = TaskRunner(
task=Task(trigger=prefect.triggers.all_failed),
state_handlers=[task_runner_handler],
)
state = runner.run(
upstream_states={Edge(Task(), Task(), mapped=True): Success(result=[1])}
)
# the parent task changed state two times: Pending -> Mapped -> Mapped
# the child task changed state one time: Pending -> TriggerFailed
assert isinstance(state, Mapped)
assert task_runner_handler.call_count == 3
assert isinstance(state.map_states[0], TriggerFailed)
def test_with_two_finished(self):
state = Pending()
new_state = TaskRunner(Task()).check_upstream_finished(
state=state, upstream_states={1: Success(), 2: Failed()}
)
assert new_state is state
def test_unwrap_nested_meta_states(self):
state = Retrying(run_count=1)
result = TaskRunner(Task()).initialize_run(
state=Submitted(state=Queued(state=Submitted(state=Queued(state=state)))),
context={},
)
assert result.state is state
def test_any_successful_pass(self):
task = Task(trigger=prefect.triggers.any_successful)
state = Pending()
new_state = TaskRunner(task).check_task_trigger(
state=state, upstream_states={1: Success(), 2: Failed()}
)
assert new_state is state
def test_all_of_run_context_is_available_to_custom_cache_validators(self):
ctxt = dict()
def custom_validator(state, inputs, parameters):
ctxt.update(prefect.context.to_dict())
return False
# have to have a state worth checking to trigger the validator
with prefect.context(caches={"Task": [State()]}):
task = Task(
cache_for=timedelta(seconds=10), cache_validator=custom_validator
)
state = TaskRunner(task).run()
expected_subset = dict(
map_index=None,
task_full_name="Task",
task_run_count=1,
task_name="Task",
task_tags=set(),
task_slug=task.slug,
checkpointing=False,
)
for key, val in expected_subset.items():
assert ctxt[key] == val
def test_flow_raises_for_irrelevant_user_provided_parameters():
class ParameterTask(Task):
def run(self):
return prefect.context.get("parameters")
with Flow() as f:
x = Parameter("x")
t = ParameterTask()
f.add_task(x)
f.add_task(t)
with pytest.raises(TypeError):
state = f.run(return_tasks=[t], parameters=dict(x=10, y=3, z=9))
with pytest.raises(TypeError):
state = f.run(return_tasks=[t], x=10, y=3, z=9)
def test_task_runner_handlers_are_called_on_triggerfailed(self):
task_runner_handler = MagicMock(side_effect=lambda t, o, n: n)
runner = TaskRunner(
task=Task(trigger=prefect.triggers.all_failed),
state_handlers=[task_runner_handler],
)
state = runner.run(upstream_states={Edge(Task(), Task()): Success()})
# the task changed state one time: Pending -> TriggerFailed
assert isinstance(state, TriggerFailed)
assert task_runner_handler.call_count == 1
def test_binding_a_task_with_var_kwargs_expands_the_kwargs():
class KwargsTask(Task):
def run(self, **kwargs):
return kwargs
t1 = Task()
t2 = Task()
t3 = Task()
kw = KwargsTask()
with Flow(name="test") as f:
kw.bind(a=t1, b=t2, c=t3)
assert t1 in f.tasks
assert t2 in f.tasks
assert t3 in f.tasks
assert Edge(t1, kw, key="a") in f.edges
def test_cache_survives_pickling(self):
f = Flow()
t1 = Task()
t2 = Task()
t3 = Task()
f.add_edge(t1, t2)
f.sorted_tasks()
key = ("_sorted_tasks", (("root_tasks", ()),))
f._cache[key] = 1
assert f.sorted_tasks() == 1
f2 = cloudpickle.loads(cloudpickle.dumps(f))
assert f2.sorted_tasks() == 1
f2.add_edge(t2, t3)
assert f2.sorted_tasks() != 1