Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_routes_multiple_scenarios(self, simple_linear_model, tmpdir):
"""
Test the TablesRecorder
"""
from pywr.parameters import ConstantScenarioParameter
model = simple_linear_model
scA = Scenario(model, name='A', size=4)
scB = Scenario(model, name='B', size=2)
otpt = model.nodes['Output']
inpt = model.nodes['Input']
inpt.max_flow = ConstantScenarioParameter(model, scA, [10, 20, 30, 40])
otpt.max_flow = ConstantScenarioParameter(model, scB, [20, 40])
otpt.cost = -2.0
h5file = tmpdir.join('output.h5')
import tables
with tables.open_file(str(h5file), 'w') as h5f:
rec = TablesRecorder(model, h5f, routes_flows='flows')
model.run()
flows = h5f.get_node('/flows')
assert flows.shape == (365, 1, 4, 2)
np.testing.assert_allclose(flows[0, 0], [[10, 10], [20, 20], [20, 30], [20, 40]])
def test_parameter_constant_scenario(simple_linear_model):
"""
Test ConstantScenarioParameter
"""
model = simple_linear_model
# Add two scenarios
scA = Scenario(model, 'Scenario A', size=2)
scB = Scenario(model, 'Scenario B', size=5)
p = ConstantScenarioParameter(model, scB, np.arange(scB.size, dtype=np.float64))
model.setup()
ts = model.timestepper.current
# Now ensure the appropriate value is returned for the Scenario B indices.
for i, (a, b) in enumerate(itertools.product(range(scA.size), range(scB.size))):
si = ScenarioIndex(i, np.array([a, b], dtype=np.int32))
np.testing.assert_allclose(p.value(ts, si), float(b))
def test_aggregated_storage_scenarios(three_storage_model):
""" Test `AggregatedScenario` correctly sums over scenarios. """
from pywr.core import Scenario
from pywr.parameters import ConstantScenarioParameter
m = three_storage_model
sc = Scenario(m, 'A', size=5)
agg_stg = m.nodes['Total Storage']
stgs = [m.nodes['Storage {}'.format(num)] for num in range(3)]
# Setup scenario
inpt1 = m.nodes['Input 1'].max_flow = ConstantScenarioParameter(m, sc, range(sc.size))
m.setup()
m.step()
# Finally check volume is summed correctly
np.testing.assert_allclose(agg_stg.volume, sum(s.volume for s in stgs))
current_pc = sum(s.volume for s in stgs) / (sum(s.max_volume for s in stgs))
np.testing.assert_allclose(agg_stg.current_pc, current_pc)
np.testing.assert_allclose(agg_stg.flow, sum(s.flow for s in stgs))
m.step()
# Finally check volume is summed correctly
np.testing.assert_allclose(agg_stg.volume, sum(s.volume for s in stgs))
current_pc = sum(s.volume for s in stgs) / (sum(s.max_volume for s in stgs))
np.testing.assert_allclose(agg_stg.current_pc, current_pc)
def test_agg(self, simple_linear_model, agg_func):
model = simple_linear_model
model.timestepper.delta = 15
scenarioA = Scenario(model, "Scenario A", size=2)
scenarioB = Scenario(model, "Scenario B", size=5)
values = np.arange(366, dtype=np.float64)
p1 = DailyProfileParameter(model, values)
p2 = ConstantScenarioParameter(model, scenarioB, np.arange(scenarioB.size, dtype=np.float64))
p = AggregatedParameter(model, [p1, p2], agg_func=agg_func)
func = TestAggregatedParameter.funcs[agg_func]
@assert_rec(model, p)
def expected_func(timestep, scenario_index):
x = p1.get_value(scenario_index)
y = p2.get_value(scenario_index)
return func(np.array([x,y]))
model.run()
def test_scenario_two_parameter(simple_linear_model, ):
"""Basic test of Scenario functionality"""
model = simple_linear_model # Convenience renaming
scenario_input = Scenario(model, 'Inflow', size=2)
model.nodes["Input"].max_flow = ConstantScenarioParameter(model, scenario_input, [5.0, 10.0])
model.nodes["Output"].max_flow = ConstantScenarioParameter(model, scenario_input, [8.0, 3.0])
model.nodes["Output"].cost = -2.0
expected_node_results = {
"Input": [5.0, 3.0],
"Link": [5.0, 3.0],
"Output": [5.0, 3.0],
}
assert_model(model, expected_node_results)
"""
Test the TablesRecorder with user defined scenario subset
"""
from pywr.parameters import ConstantScenarioParameter
model = simple_linear_model
scA = Scenario(model, name='A', size=4)
scB = Scenario(model, name='B', size=2)
# Use first and last combinations
model.scenarios.user_combinations = [[0, 0], [3, 1]]
otpt = model.nodes['Output']
inpt = model.nodes['Input']
inpt.max_flow = ConstantScenarioParameter(model, scA, [10, 20, 30, 40])
otpt.max_flow = ConstantScenarioParameter(model, scB, [20, 40])
otpt.cost = -2.0
h5file = tmpdir.join('output.h5')
import tables
with tables.open_file(str(h5file), 'w') as h5f:
rec = TablesRecorder(model, h5f)
model.run()
for node_name in model.nodes.keys():
ca = h5f.get_node('/', node_name)
assert ca.shape == (365, 2)
np.testing.assert_allclose(ca[0, ...], [10, 40])
# check combinations table exists
def test_scenario_storage(solver):
"""Test the behaviour of Storage nodes with multiple scenarios
The model defined has two inflow scenarios: 5 and 10. It is expected that
the volume in the storage node should increase at different rates in the
two scenarios.
"""
model = Model(solver=solver)
i = Input(model, 'input', max_flow=999)
s = Storage(model, 'storage', num_inputs=1, num_outputs=1, max_volume=1000, initial_volume=500)
o = Output(model, 'output', max_flow=999)
scenario_input = Scenario(model, 'Inflow', size=2)
i.min_flow = ConstantScenarioParameter(model, scenario_input, [5.0, 10.0])
i.connect(s)
s.connect(o)
s_rec = NumpyArrayStorageRecorder(model, s)
model.run()
assert_allclose(i.flow, [5, 10])
assert_allclose(s_rec.data[0], [505, 510])
assert_allclose(s_rec.data[1], [510, 520])
def test_routes_multiple_scenarios(self, simple_linear_model, tmpdir):
"""
Test the TablesRecorder
"""
from pywr.parameters import ConstantScenarioParameter
model = simple_linear_model
scA = Scenario(model, name='A', size=4)
scB = Scenario(model, name='B', size=2)
otpt = model.nodes['Output']
inpt = model.nodes['Input']
inpt.max_flow = ConstantScenarioParameter(model, scA, [10, 20, 30, 40])
otpt.max_flow = ConstantScenarioParameter(model, scB, [20, 40])
otpt.cost = -2.0
h5file = tmpdir.join('output.h5')
import tables
with tables.open_file(str(h5file), 'w') as h5f:
rec = TablesRecorder(model, h5f, routes_flows='flows')
model.run()
flows = h5f.get_node('/flows')
assert flows.shape == (365, 1, 4, 2)
np.testing.assert_allclose(flows[0, 0], [[10, 10], [20, 20], [20, 30], [20, 40]])
from pywr.parameters import ConstantScenarioParameter, ConstantParameter
from pywr.parameters.control_curves import ControlCurveIndexParameter
model = simple_storage_model
scenario = Scenario(model, 'A', size=2)
# Simulate 5 years
model.timestepper.start = '2015-01-01'
model.timestepper.end = '2019-12-31'
# Control curve parameter
param = ControlCurveIndexParameter(model, model.nodes['Storage'], ConstantParameter(model, 0.25))
# Storage model has a capacity of 20, but starts at 10 Ml
# Demand is roughly 2 Ml/d per year
# First ensemble balances the demand
# Second ensemble should fail during 3rd year
demand = 2.0 / 365
model.nodes['Input'].max_flow = ConstantScenarioParameter(model, scenario, [demand, 0])
model.nodes['Output'].max_flow = demand
# Create the recorder with a threshold of 1
rec = TimestepCountIndexParameterRecorder(model, param, 1)
model.run()
assert_allclose([0, 183 + 365 + 365], rec.values(), atol=1e-7)
def test_two_scenarios(simple_linear_model, ):
"""Basic test of Scenario functionality"""
model = simple_linear_model # Convenience renaming
scenario_input = Scenario(model, 'Inflow', size=2)
model.nodes["Input"].max_flow = ConstantScenarioParameter(model, scenario_input, [5.0, 10.0])
scenario_outflow = Scenario(model, 'Outflow', size=2, ensemble_names=['High', 'Low'])
model.nodes["Output"].max_flow = ConstantScenarioParameter(model, scenario_outflow, [3.0, 8.0])
model.nodes["Output"].cost = -2.0
# Check ensemble names are provided in the multi-index
index = model.scenarios.multiindex
assert index.levels[0].name == 'Inflow'
assert index.levels[1].name == 'Outflow'
assert np.all(index.levels[1] == ['High', 'Low'])
# add numpy recorders to input and output nodes
NumpyArrayNodeRecorder(model, model.nodes["Input"], "input")
NumpyArrayNodeRecorder(model, model.nodes["Output"], "output")
expected_node_results = {