Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_task_with_and_without_result_handler(self):
t1 = Task()
assert t1.result_handler is None
t2 = Task(result_handler=JSONResultHandler())
assert isinstance(t2.result_handler, ResultHandler)
assert isinstance(t2.result_handler, JSONResultHandler)
def test_basic_safe_result_repr():
r = SafeResult(2, result_handler=JSONResultHandler())
assert repr(r) == ""
def test_serialize_state_with_handled_result(cls):
res = Result(value=1, result_handler=JSONResultHandler())
res.store_safe_value()
state = cls(message="message", result=res)
serialized = StateSchema().dump(state)
assert isinstance(serialized, dict)
assert serialized["type"] == cls.__name__
assert serialized["message"] == "message"
assert serialized["_result"]["type"] == "SafeResult"
assert serialized["_result"]["value"] == "1"
assert serialized["__version__"] == prefect.__version__
"obj", [SafeResult(value=19, result_handler=JSONResultHandler()), NoResult]
)
def test_state_result_schema_chooses_schema(obj):
schema = StateResultSchema()
assert type(schema.load(schema.dump(obj))) == type(obj)
def test_json_handler_raises_normally(self):
handler = JSONResultHandler()
with pytest.raises(TypeError):
handler.write(type(None))
def test_task_failure_caches_inputs_automatically(client):
@prefect.task(max_retries=2, retry_delay=timedelta(seconds=100))
def is_p_three(p):
if p == 3:
raise ValueError("No thank you.")
with prefect.Flow("test") as f:
p = prefect.Parameter("p")
res = is_p_three(p)
state = CloudFlowRunner(flow=f).run(return_tasks=[res], parameters=dict(p=3))
assert state.is_running()
assert isinstance(state.result[res], Retrying)
exp_res = Result(3, result_handler=JSONResultHandler())
assert not state.result[res].cached_inputs["p"] == exp_res
exp_res.store_safe_value()
assert state.result[res].cached_inputs["p"] == exp_res
last_state = client.set_task_run_state.call_args_list[-1][-1]["state"]
assert isinstance(last_state, Retrying)
assert last_state.cached_inputs["p"] == exp_res
def test_create_task_with_and_without_result_handler(self):
t1 = Task()
assert t1.result_handler is None
t2 = Task(result_handler=JSONResultHandler())
assert isinstance(t2.result_handler, ResultHandler)
assert isinstance(t2.result_handler, JSONResultHandler)
def test_safe_result_inits_with_both_args(self):
res = SafeResult(value="3", result_handler=JSONResultHandler())
assert res.value == "3"
assert res.result_handler == JSONResultHandler()
assert res.safe_value is res
def test_serialize(self):
serialized = ResultHandlerSchema().dump(JSONResultHandler())
assert isinstance(serialized, dict)
Args:
- state (State): the current state of this task
- inputs (Dict[str, Result], optional): a dictionary of inputs whose keys correspond
to the task's `run()` arguments.
Returns:
- State: the state of the task after running the check
"""
if state.is_failed():
run_count = prefect.context.get("task_run_count", 1)
if prefect.context.get("task_loop_count") is not None:
loop_context = {
"_loop_count": Result(
value=prefect.context["task_loop_count"],
result_handler=JSONResultHandler(),
),
"_loop_result": Result(
value=prefect.context.get("task_loop_result"),
result_handler=self.result_handler,
),
}
inputs.update(loop_context)
if run_count <= self.task.max_retries:
start_time = pendulum.now("utc") + self.task.retry_delay
msg = "Retrying Task (after attempt {n} of {m})".format(
n=run_count, m=self.task.max_retries + 1
)
retry_state = Retrying(
start_time=start_time,
cached_inputs=inputs,
message=msg,