Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_aggregated_node_three_factors(model):
"""Nodes constrained by a fixed ratio between flows (3 nodes)"""
A = Input(model, "A")
B = Input(model, "B", max_flow=10.0)
C = Input(model, "C")
Z = Output(model, "Z", max_flow=100, cost=-10)
agg = AggregatedNode(model, "agg", [A, B, C])
agg.factors = [0.5, 1.0, 2.0]
assert_allclose(agg.factors, [0.5, 1.0, 2.0])
A.connect(Z)
B.connect(Z)
C.connect(Z)
model.run()
assert_allclose(agg.flow, 35.0)
assert_allclose(A.flow, 5.0)
assert_allclose(B.flow, 10.0)
assert_allclose(C.flow, 20.0)
def test_create_directory(self, simple_linear_model, tmpdir):
""" Test TablesRecorder to create a new directory """
model = simple_linear_model
otpt = model.nodes['Output']
inpt = model.nodes['Input']
agg_node = AggregatedNode(model, 'Sum', [otpt, inpt])
inpt.max_flow = 10.0
otpt.cost = -2.0
# Make a path with a new directory
folder = tmpdir.join('outputs')
h5file = folder.join('output.h5')
assert(not folder.exists())
rec = TablesRecorder(model, str(h5file), create_directories=True)
model.run()
assert(folder.exists())
assert(h5file.exists())
def test_aggregated_node_max_flow_with_weights(model, flow_weights, expected_agg_flow, expected_A_flow, expected_B_flow):
"""Nodes constrained by the weighted max_flow of their AggregatedNode"""
A = Input(model, "A", max_flow=20.0, cost=1)
B = Input(model, "B", max_flow=20.0, cost=8)
Z = Output(model, "Z", max_flow=100, cost=-10)
A.connect(Z)
B.connect(Z)
agg = AggregatedNode(model, "agg", [A, B])
agg.flow_weights = flow_weights
agg.max_flow = 30.0
model.run()
assert_allclose(agg.flow, expected_agg_flow)
assert_allclose(A.flow, expected_A_flow)
assert_allclose(B.flow, expected_B_flow)
all_res = []
all_otpt = []
for num in range(3):
inpt = Input(model, name="Input {}".format(num), max_flow=5.0*num, cost=-1)
res = Storage(model, name="Storage {}".format(num), num_outputs=1, num_inputs=1, max_volume=20, initial_volume=10+num)
otpt = Output(model, name="Output {}".format(num), max_flow=8+num, cost=-999)
inpt.connect(res)
res.connect(otpt)
all_res.append(res)
all_otpt.append(otpt)
AggregatedStorage(model, name='Total Storage', storage_nodes=all_res)
AggregatedNode(model, name='Total Output', nodes=all_otpt)
return model
def test_aggregated_node_min_flow(model):
"""Nodes constrained by the min_flow of their AggregatedNode"""
A = Input(model, "A", max_flow=20.0, cost=1)
B = Input(model, "B", max_flow=20.0, cost=100)
Z = Output(model, "Z", max_flow=100, cost=0)
A.connect(Z)
B.connect(Z)
agg = AggregatedNode(model, "agg", [A, B])
agg.min_flow = 15.0
model.run()
assert_allclose(agg.flow, 15.0)
assert_allclose(A.flow, 15.0)
assert_allclose(B.flow, 0.0)
X = PiecewiseLink(model, name="X", cost=[-500.0, 0, 0], max_flow=[40.0, None, None])
C = Output(model, "C")
A.connect(X)
X.connect(C)
# create a new input inside the piecewise link which only has access
# to flow travelling via the last sublink (X2)
Bo = Output(model, "Bo", domain=X.sub_domain)
Bi = Input(model, "Bi")
D = Output(model, "D", max_flow=50, cost=-100)
Bo.connect(Bi)
Bi.connect(D)
X.sublinks[-1].connect(Bo)
agg = AggregatedNode(model, "agg", X.sublinks[1:])
agg.factors = [3.0, 1.0]
model.step()
assert_allclose(D.flow, min((flow - 40) * 0.25, 50.0))
def test_routes(self, simple_linear_model, tmpdir):
"""
Test the TablesRecorder
"""
model = simple_linear_model
otpt = model.nodes['Output']
inpt = model.nodes['Input']
agg_node = AggregatedNode(model, 'Sum', [otpt, inpt])
inpt.max_flow = 10.0
otpt.cost = -2.0
h5file = tmpdir.join('output.h5')
import tables
with tables.open_file(str(h5file), 'w') as h5f:
rec = TablesRecorder(model, h5f, routes_flows='flows')
model.run()
flows = h5f.get_node('/flows')
assert flows.shape == (365, 1, 1)
np.testing.assert_allclose(flows.read(), np.ones((365, 1, 1))*10)
routes = h5f.get_node('/routes')
"""
from pywr.parameters import ConstantParameter
model = simple_linear_model
otpt = model.nodes['Output']
inpt = model.nodes['Input']
p = ConstantParameter(model, 10.0, name='max_flow')
inpt.max_flow = p
# ensure TablesRecorder can handle parameters with a / in the name
p_slash = ConstantParameter(model, 0.0, name='name with a / in it')
inpt.min_flow = p_slash
agg_node = AggregatedNode(model, 'Sum', [otpt, inpt])
inpt.max_flow = 10.0
otpt.cost = -2.0
h5file = tmpdir.join('output.h5')
import tables
with tables.open_file(str(h5file), 'w') as h5f:
with pytest.warns(ParameterNameWarning):
rec = TablesRecorder(model, h5f, parameters=[p, p_slash])
# check parameters have been added to the component tree
# this is particularly important for parameters which update their
# values in `after`, e.g. DeficitParameter (see #465)
assert(not model.find_orphaned_parameters())
assert(p in rec.children)
assert(p_slash in rec.children)
def test_nodes_with_str(self, simple_linear_model, tmpdir):
"""
Test the TablesRecorder
"""
from pywr.parameters import ConstantParameter
model = simple_linear_model
otpt = model.nodes['Output']
inpt = model.nodes['Input']
agg_node = AggregatedNode(model, 'Sum', [otpt, inpt])
p = ConstantParameter(model, 10.0, name='max_flow')
inpt.max_flow = p
otpt.cost = -2.0
h5file = tmpdir.join('output.h5')
import tables
with tables.open_file(str(h5file), 'w') as h5f:
nodes = ['Output', 'Input', 'Sum']
where = "/agroup"
rec = TablesRecorder(model, h5f, nodes=nodes,
parameters=[p, ], where=where)
model.run()
for node_name in ['Output', 'Input', 'Sum', 'max_flow']:
def test_aggregated_node_min_flow_parameter(model):
"""Nodes constrained by the min_flow of their AggregatedNode"""
A = Input(model, "A", max_flow=20.0, cost=1)
B = Input(model, "B", max_flow=20.0, cost=100)
Z = Output(model, "Z", max_flow=100, cost=0)
A.connect(Z)
B.connect(Z)
agg = AggregatedNode(model, "agg", [A, B])
agg.min_flow = ConstantParameter(model, 15.0)
model.run()
assert_allclose(agg.flow, 15.0)
assert_allclose(A.flow, 15.0)
assert_allclose(B.flow, 0.0)