Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_deserialize_task_with_cache():
"""
When tasks are deserialized, they put their slugs in a special task_cache in context
so it can be reused elsewhere.
"""
t = Task()
# empty contexts are replaced
context = {1: 1}
serialized = TaskSchema(context=context).dump(t)
deserialized = TaskSchema(context=context).load(serialized)
assert "task_cache" in context
assert context["task_cache"][t.slug] is deserialized
def test_deserialize_edge_has_no_task_info(self):
"""
Edges only serialize task IDs (if available), so the task names will revert to default.
"""
t1, t2 = Task("t1"), Task("t2")
serialized = core.EdgeSchema().dump(Edge(t1, t2, key="x", mapped=True))
deserialized = core.EdgeSchema().load(serialized)
assert deserialized.upstream_task.name is "Task"
assert deserialized.downstream_task.name is "Task"
def test_set_upstream(self):
f = Flow(name="test")
t1 = Task()
t2 = Task()
t2.set_upstream(t1, flow=f)
assert Edge(t1, t2) in f.edges
def test_bad_cache_kwarg_combo(self):
with pytest.warns(UserWarning, match=".*Task will not be cached.*"):
Task(cache_validator=all_inputs)
def test_create_task_with_and_without_checkpoint(self):
t = Task()
assert t.checkpoint is False
s = Task(checkpoint=True)
assert s.checkpoint is True
with set_temporary_config({"tasks.defaults.checkpoint": True}):
r = Task()
assert r.checkpoint is True
def test_create_task_with_and_without_checkpoint(self):
t = Task()
assert t.checkpoint is False
s = Task(checkpoint=True)
assert s.checkpoint is True
with set_temporary_config({"tasks.defaults.checkpoint": True}):
r = Task()
assert r.checkpoint is True
def test_task_runner_sends_checkpointed_success_states_to_cloud(self, client):
handler = JSONResultHandler()
@prefect.task(checkpoint=True, result_handler=handler)
def add(x, y):
return x + y
x_state, y_state = Success(result=Result(1)), Success(result=Result(1))
upstream_states = {
Edge(Task(), Task(), key="x"): x_state,
Edge(Task(), Task(), key="y"): y_state,
}
res = CloudTaskRunner(task=add).run(upstream_states=upstream_states)
## assertions
assert client.get_task_run_info.call_count == 0 # never called
assert (
client.set_task_run_state.call_count == 2
) # Pending -> Running -> Successful
states = [call[1]["state"] for call in client.set_task_run_state.call_args_list]
assert states[0].is_running()
assert states[1].is_successful()
assert states[1]._result.safe_value == SafeResult("2", result_handler=handler)
def test_tags():
t1 = Task()
assert t1.tags == set()
with pytest.raises(TypeError):
Task(tags="test")
t3 = Task(tags=["test", "test2", "test"])
assert t3.tags == set(["test", "test2"])
with prefect.context(tags=["test"]):
t4 = Task()
assert t4.tags == set(["test"])
with prefect.context(tags=["test1", "test2"]):
t5 = Task(tags=["test3"])
assert t5.tags == set(["test1", "test2", "test3"])
def test_edge_has_tasks_property():
t1 = Task()
t2 = TaskWithKey()
t3 = Task()
edge = Edge(t1, t2, key="a_key")
assert edge.tasks == {t1, t2}
def test_running_state_finishes(self):
flow = Flow(name="test", tasks=[Task()])
new_state = FlowRunner(flow=flow).get_flow_run_state(
state=Running(),
task_states={},
task_contexts={},
return_tasks=set(),
task_runner_state_handlers=[],
executor=LocalExecutor(),
)
assert new_state.is_successful()