Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Result(object, result_handler=LocalResultHandler()),
NoResult,
SafeResult("3", result_handler=JSONResultHandler()),
],
)
def test_everything_is_pickleable_after_init(obj):
assert cloudpickle.loads(cloudpickle.dumps(obj)) == obj
return sum(x)
with Flow(name="test") as f:
n = numbers()
x = plus_one.map(n)
y = plus_one.map(x)
s = get_sum(y)
# first run with a missing result from `n` but map_states for `x`
state = FlowRunner(flow=f).run(
executor=executor,
task_states={
n: Success(),
x: Mapped(
map_states=[
Pending(cached_inputs={"x": Result(i)}) for i in range(1, 4)
]
),
},
return_tasks=f.tasks,
)
assert state.is_successful()
assert state.result[s].result == 12
# next run with missing results for n and x
state = FlowRunner(flow=f).run(
executor=executor,
task_states={
n: Success(),
x: Mapped(map_states=[Success(), Success(), Success()]),
y: Mapped(
def test_reads_result_from_context_with_cache_key_if_cached_valid(self):
task = Task(
cache_for=timedelta(minutes=1),
cache_validator=cache_validators.duration_only,
cache_key="FOO",
)
result = SafeResult("2", result_handler=JSONResultHandler())
state = Cached(
result=result,
cached_result_expiration=pendulum.now("utc") + timedelta(minutes=1),
)
with prefect.context(caches={"FOO": [state]}):
new = TaskRunner(task).check_task_is_cached(
state=Pending(), inputs={"a": Result(1)}
)
assert new is state
assert new.result == 2
def test_returns_success_with_hydrated_result_obj(self):
runner = TaskRunner(task=Task())
state = runner.get_task_run_state(
state=Running(), inputs={}, timeout_handler=None
)
assert state.is_successful()
assert isinstance(state._result, Result)
assert state._result == Result(value=None, result_handler=runner.result_handler)
def test_inputs(self):
@prefect.task
def fn(x):
return x + 1
state = Running()
new_state = TaskRunner(task=fn).get_task_run_state(
state=state, inputs={"x": Result(1)}, timeout_handler=None
)
assert new_state.is_successful()
assert new_state.result == 2
dull_state = Cached(
cached_result_expiration=datetime.datetime.utcnow()
+ datetime.timedelta(minutes=2),
result=SafeResult("-1", JSONResultHandler()),
)
state = Cached(
cached_result_expiration=datetime.datetime.utcnow()
+ datetime.timedelta(minutes=2),
result=SafeResult("99", JSONResultHandler()),
cached_inputs={"x": SafeResult("2", result_handler=JSONResultHandler())},
)
client.get_latest_cached_states = MagicMock(return_value=[dull_state, state])
res = CloudTaskRunner(task=cached_task).check_task_is_cached(
Pending(), inputs={"x": Result(2, result_handler=Handler())}
)
assert client.get_latest_cached_states.called
assert res.is_pending()
def test_results_are_same_if_handled(self):
r = Result("3", result_handler=JSONResultHandler())
s = Result("3", result_handler=JSONResultHandler())
r.store_safe_value()
s.store_safe_value()
assert s == r
def test_create_state_with_kwarg_data_arg(cls):
state = cls(result=1)
assert isinstance(state._result, Result)
assert state._result.safe_value is NoResult
assert state._result.result_handler is None
assert state.result == 1
assert state.message is None
assert isinstance(state._result, Result)
# -- process each edge to the task
for edge in self.flow.edges_to(task):
upstream_states[edge] = task_states.get(
edge.upstream_task, Pending(message="Task state not available.")
)
# augment edges with upstream constants
for key, val in self.flow.constants[task].items():
edge = Edge(
upstream_task=prefect.tasks.core.constants.Constant(val),
downstream_task=task,
key=key,
)
upstream_states[edge] = Success(
"Auto-generated constant value",
result=Result(val, result_handler=ConstantResultHandler(val)),
)
# -- run the task
with prefect.context(task_full_name=task.name, task_tags=task.tags):
task_states[task] = executor.submit(
self.run_task,
task=task,
state=task_state,
upstream_states=upstream_states,
context=dict(prefect.context, **task_contexts.get(task, {})),
task_runner_state_handlers=task_runner_state_handlers,
executor=executor,
)
# ---------------------------------------------
# Otherwise, we are mapping over the result of a "vanilla" task. In this
# case, we create a copy of the upstream state but set the result to the
# appropriately-indexed item from the upstream task's `State.result`
# array.
else:
states[edge] = copy.copy(upstream_state)
# if the current state is already Mapped, then we might be executing
# a re-run of the mapping pipeline. In that case, the upstream states
# might not have `result` attributes (as any required results could be
# in the `cached_inputs` attribute of one of the child states).
# Therefore, we only try to get a result if EITHER this task's
# state is not already mapped OR the upstream result is not None.
if not state.is_mapped() or upstream_state.result != NoResult:
upstream_result = Result(
upstream_state.result[i],
result_handler=upstream_state._result.result_handler, # type: ignore
)
states[edge].result = upstream_result
elif state.is_mapped():
if i >= len(state.map_states): # type: ignore
raise IndexError()
# only add this iteration if we made it through all iterables
map_upstream_states.append(states)
# index error means we reached the end of the shortest iterable
except IndexError:
break
def run_fn(