Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ready_states(self, state):
new_state = TaskRunner(task=Task()).check_task_is_ready(state=state)
assert new_state is state
def test_states_without_run_count(self, state):
with prefect.context() as ctx:
assert "task_run_count" not in ctx
result = TaskRunner(Task()).initialize_run(state=state, context=ctx)
assert ctx.task_run_count == 1
assert result.state is state
def test_run_mapped_preserves_result_objects(self):
@prefect.task(cache_for=timedelta(minutes=10))
def tt(foo):
pass
with prefect.context(checkpointing=True, caches={}):
state = TaskRunner(task=tt).run_mapped_task(
state=Pending(),
upstream_states={
Edge(Task(), tt, key="foo", mapped=True): Success(
result=Result([1, 2], result_handler=JSONResultHandler())
)
},
context={},
executor=prefect.engine.executors.LocalExecutor(),
)
assert state.is_mapped()
one, two = state.map_states
assert one.cached_inputs["foo"] == Result(1, result_handler=JSONResultHandler())
assert two.cached_inputs["foo"] == Result(2, result_handler=JSONResultHandler())
def test_get_inputs_from_upstream_mapped(self):
inputs = TaskRunner(task=Task()).get_task_inputs(
state=Pending(),
upstream_states={Edge(1, 2, key="x", mapped=True): Success(result=[1, 2])},
)
assert inputs == {"x": Result([1, 2])}
def test_failed_zero_max_retry(self):
state = Failed()
new_state = TaskRunner(task=Task()).check_for_retry(state=state, inputs={})
assert new_state is state
def test_failures_arent_checkpointed():
handler = MagicMock(store_safe_value=MagicMock(side_effect=SyntaxError))
@prefect.task(checkpoint=True, result_handler=handler)
def fn():
raise TypeError("Bad types")
with prefect.context(checkpointing=True):
new_state = TaskRunner(task=fn).run()
assert new_state.is_failed()
assert isinstance(new_state.result, TypeError)
def test_task_runner_has_logger():
r = TaskRunner(Task())
assert r.logger.name == "prefect.TaskRunner"
def test_multiple_task_runner_handlers_are_called(self):
task_runner_handler = MagicMock(side_effect=lambda t, o, n: n)
TaskRunner(
task=Task(), state_handlers=[task_runner_handler, task_runner_handler]
).run()
# each task changed state two times: Pending -> Running -> Success
assert task_runner_handler.call_count == 4
def test_multiple_task_handlers_are_called(self):
task_handler = MagicMock(side_effect=lambda t, o, n: n)
task = Task(state_handlers=[task_handler, task_handler])
TaskRunner(task=task).run()
# each task changed state twice: Pending -> Running -> Success
assert task_handler.call_count == 4
def test_run_mapped_returns_mapped(self, state):
state = TaskRunner(task=Task()).run_mapped_task(
state=state,
upstream_states={},
context={},
executor=prefect.engine.executors.LocalExecutor(),
)
assert state.is_mapped()