Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_dict_automatically_applied_to_callargs(self):
x = Parameter("x")
y = Parameter("y")
identity = IdentityTask()
with Flow(name="test") as f:
identity.bind(x=dict(a=x, b=y))
state = f.run(parameters=dict(x=1, y=2))
assert len(f.tasks) == 5 # 2 params, identity, Dict, List of dict values
assert sum(isinstance(t, collections.Dict) for t in f.tasks) == 1
assert state.result[identity].result == dict(a=1, b=2)
def test_right_division(self):
with Flow(name="test") as f:
z = 10 / Parameter("x")
state = f.run(parameters=dict(x=4))
assert state.result[z].result == 2.5
def test_nested_collection_automatically_applied_to_callargs(self):
x = Parameter("x")
y = Parameter("y")
identity = IdentityTask()
with Flow(name="test") as f:
identity.bind(x=dict(a=[x, dict(y=y)], b=(y, set([x]))))
state = f.run(parameters=dict(x=1, y=2))
assert len(f.tasks) == 10
assert state.result[identity].result == dict(a=[1, dict(y=2)], b=(2, set([1])))
def test_subtraction(self):
with Flow(name="test") as f:
z = Parameter("x") - Parameter("y")
state = f.run(parameters=dict(x=1, y=2))
assert state.result[z].result == -1
def test_providing_cachedstate_with_simple_example(self, executor):
class TestTask(Task):
call_count = 0
def run(self, x, s):
self.call_count += 1
return self.call_count
with Flow(name="test") as f:
y = TestTask(
cache_validator=duration_only, cache_for=datetime.timedelta(days=1)
)
x = Parameter("x")
s = SuccessTask()
f.add_edge(x, y, key="x")
f.add_edge(s, y, key="s")
state = Cached(
cached_result_expiration=pendulum.now("utc") + datetime.timedelta(days=1),
result=100,
)
flow_state = FlowRunner(flow=f).run(
executor=executor,
parameters=dict(x=1),
return_tasks=[y],
task_states={y: state},
)
assert isinstance(flow_state, Success)
assert flow_state.result[y].result == 100
def test_right_gte(self):
with Flow(name="test") as f:
z = 10 >= Parameter("x")
state = f.run(parameters=dict(x=10))
assert state.result[z].result is True
def test_getitem_list(self):
with Flow(name="test") as f:
z = Parameter("x")[Parameter("y")]
state = f.run(parameters=dict(x=[1, 2, 3], y=1))
assert state.result[z].result == 2
def test_retries_cache_parameters_as_well(self, executor):
with Flow(name="test") as f:
a = Parameter("a")
b = ReturnTask(max_retries=1, retry_delay=datetime.timedelta(0))
a_res = a()
b_res = b(a_res)
first_state = FlowRunner(flow=f).run(
executor=executor, parameters=dict(a=1), return_tasks=f.tasks
)
assert first_state.is_running()
a_state = first_state.result[a_res]
a_state.result = (
NoResult # remove the result to see if the cached results are picked up
)
b_state = first_state.result[b_res]
b_state.cached_inputs = dict(x=Result(2)) # artificially alter state
def test_map_over_parameters(executor):
a = AddTask()
with Flow(name="test") as f:
ll = Parameter("list")
res = a.map(ll)
s = f.run(executor=executor, parameters=dict(list=[1, 2, 3]))
m = s.result[res]
assert s.is_successful()
assert m.is_mapped()
assert isinstance(m.map_states, list)
assert all(s.is_successful() for s in m.map_states)
assert len(m.map_states) == 3
assert m.result == [2, 3, 4]
def test_pow_with_constant(self):
with Flow(name="test") as f:
z = Parameter("x") ** 3
state = f.run(parameters=dict(x=2))
assert state.result[z].result == 8