Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_demand_saving_with_indexed_array():
"""Test demand saving based on reservoir control curves
This is a relatively complex test to pass due to the large number of
dependencies of the parameters actually being tested. The test is an
example of how demand savings can be applied in times of drought based
on the state of a reservoir.
"""
model = load_model("demand_saving2.json")
model.timestepper.end = pd.Timestamp("2016-01-31")
rec_demand = NumpyArrayNodeRecorder(model, model.nodes["Demand"])
rec_storage = NumpyArrayStorageRecorder(model, model.nodes["Reservoir"])
model.check()
model.run()
max_volume = model.nodes["Reservoir"].max_volume
# model starts with no demand saving
demand_baseline = 50.0
demand_factor = 0.9 # jan-apr
demand_saving = 1.0
assert_allclose(rec_demand.data[0, 0], demand_baseline * demand_factor * demand_saving)
# first control curve breached
demand_saving = 0.95
assert(rec_storage.data[4, 0] < (0.8 * max_volume) )
def test_numpy_recorder_factored(simple_linear_model):
"""Test the optional factor applies correctly """
model = simple_linear_model
otpt = model.nodes['Output']
otpt.max_flow = 30.0
model.nodes['Input'].max_flow = 10.0
otpt.cost = -2
factor = 2.0
rec_fact = NumpyArrayNodeRecorder(model, otpt, factor=factor)
model.run()
assert rec_fact.data.shape == (365, 1)
assert_allclose(20, rec_fact.data, atol=1e-7)
def test_event_capture_with_node(self, cyclical_linear_model):
""" Test Node flow events using a NodeThresholdRecorder """
m = cyclical_linear_model
otpt = m.nodes['Output']
arry = NumpyArrayNodeRecorder(m, otpt)
# Create the trigger using a threhsold parameter
trigger = NodeThresholdRecorder(m, otpt, 4.0, predicate='>')
evt_rec = EventRecorder(m, trigger)
m.run()
# Ensure there is at least one event
assert evt_rec.events
# Build a timeseries of when the events say an event is active
triggered = np.zeros_like(arry.data, dtype=np.int)
for evt in evt_rec.events:
triggered[evt.start.index:evt.end.index, evt.scenario_index.global_id] = 1
# Check the duration
scenario_input = Scenario(model, 'Inflow', size=2)
model.nodes["Input"].max_flow = ConstantScenarioParameter(model, scenario_input, [5.0, 10.0])
scenario_outflow = Scenario(model, 'Outflow', size=2, ensemble_names=['High', 'Low'])
model.nodes["Output"].max_flow = ConstantScenarioParameter(model, scenario_outflow, [3.0, 8.0])
model.nodes["Output"].cost = -2.0
# Check ensemble names are provided in the multi-index
index = model.scenarios.multiindex
assert index.levels[0].name == 'Inflow'
assert index.levels[1].name == 'Outflow'
assert np.all(index.levels[1] == ['High', 'Low'])
# add numpy recorders to input and output nodes
NumpyArrayNodeRecorder(model, model.nodes["Input"], "input")
NumpyArrayNodeRecorder(model, model.nodes["Output"], "output")
expected_node_results = {
"Input": [3.0, 5.0, 3.0, 8.0],
"Link": [3.0, 5.0, 3.0, 8.0],
"Output": [3.0, 5.0, 3.0, 8.0],
}
assert_model(model, expected_node_results)
model.run()
# combine recorder outputs to a single dataframe
df = model.to_dataframe()
assert(df.shape == (365, 2 * 2 * 2))
assert_allclose(df["input", 0, 'High'].iloc[0], 3.0)
levels,
area=area,
storativity=storativity,
)
catchment = Input(model, 'catchment', max_flow=0)
stream = Output(model, 'stream', max_flow=np.inf, cost=0)
abstraction = Output(model, 'abstraction', max_flow=15, cost=-999)
catchment.connect(aqfer)
aqfer.connect(stream, from_slot=0)
aqfer.connect(abstraction, from_slot=1)
rec_level = NumpyArrayLevelRecorder(model, aqfer)
rec_volume = NumpyArrayStorageRecorder(model, aqfer)
rec_stream = NumpyArrayNodeRecorder(model, stream)
rec_abstraction = NumpyArrayNodeRecorder(model, abstraction)
model.check()
assert(len(aqfer.inputs) == (num_streams + num_additional_inputs))
for initial_level in (50, 100, 110, 150):
# set the inital aquifer level and therefor the initial volume
aqfer.initial_level = initial_level
initial_volume = aqfer.initial_volume
assert(initial_volume == (area * storativity[0] * initial_level * 0.001))
# run the model (for one timestep only)
model.run()
# manually calculate keating streamflow and check model flows are OK
Qp = 2 * transmissivity[0] * max(initial_level - stream_flow_levels[0][0], 0) * coefficient
Qe = 2 * transmissivity[1] * max(initial_level - stream_flow_levels[0][1], 0) * coefficient
scenario_input = Scenario(model, 'Inflow', size=2)
model.nodes["Input"].max_flow = ConstantScenarioParameter(model, scenario_input, [5.0, 10.0])
scenario_outflow = Scenario(model, 'Outflow', size=2, ensemble_names=['High', 'Low'])
model.nodes["Output"].max_flow = ConstantScenarioParameter(model, scenario_outflow, [3.0, 8.0])
model.nodes["Output"].cost = -2.0
# Check ensemble names are provided in the multi-index
index = model.scenarios.multiindex
assert index.levels[0].name == 'Inflow'
assert index.levels[1].name == 'Outflow'
assert np.all(index.levels[1] == ['High', 'Low'])
# add numpy recorders to input and output nodes
NumpyArrayNodeRecorder(model, model.nodes["Input"], "input")
NumpyArrayNodeRecorder(model, model.nodes["Output"], "output")
expected_node_results = {
"Input": [3.0, 5.0, 3.0, 8.0],
"Link": [3.0, 5.0, 3.0, 8.0],
"Output": [3.0, 5.0, 3.0, 8.0],
}
assert_model(model, expected_node_results)
model.run()
# combine recorder outputs to a single dataframe
df = model.to_dataframe()
assert(df.shape == (365, 2 * 2 * 2))
assert_allclose(df["input", 0, 'High'].iloc[0], 3.0)
assert_allclose(df["input", 0, 'Low'].iloc[0], 5.0)
def test_statistic_recorder(self, cyclical_storage_model, recorder_agg_func):
""" Test EventStatisticRecorder """
m = cyclical_storage_model
strg = m.nodes['Storage']
inpt = m.nodes['Input']
arry = NumpyArrayNodeRecorder(m, inpt)
# Create the trigger using a threhsold parameter
trigger = StorageThresholdRecorder(m, strg, 4.0, predicate='<=')
evt_rec = EventRecorder(m, trigger, tracked_parameter=inpt.max_flow)
evt_stat = EventStatisticRecorder(m, evt_rec, agg_func='max', event_agg_func='min', recorder_agg_func=recorder_agg_func)
m.run()
# Ensure there is at least one event
assert evt_rec.events
evt_values = {si.global_id:[] for si in m.scenarios.combinations}
for evt in evt_rec.events:
evt_values[evt.scenario_index.global_id].append(np.min(arry.data[evt.start.index:evt.end.index, evt.scenario_index.global_id]))
func = TestEventRecorder.funcs[recorder_agg_func]
def test_demand_saving_with_indexed_array_from_hdf():
"""Test demand saving based on a predefined demand saving level in a HDF file."""
model = load_model("demand_saving_hdf.json")
model.timestepper.end = pd.Timestamp("2016-01-31")
rec_demand = NumpyArrayNodeRecorder(model, model.nodes["Demand"])
rec_storage = NumpyArrayStorageRecorder(model, model.nodes["Reservoir"])
model.check()
model.run()
max_volume = model.nodes["Reservoir"].max_volume
# model starts with no demand saving
demand_baseline = 50.0
demand_saving = 1.0
assert_allclose(rec_demand.data[0, 0], demand_baseline * demand_saving)
# first control curve breached
demand_saving = 0.8
assert_allclose(rec_demand.data[11, 0], demand_baseline * demand_saving)
def test_annual_license_json(solver):
"""
This test demonstrates how an annual licence can be forceably distributed
evenly across a year. The licence must build up a surplus before it can
use more than the average.
"""
model = load_model("annual_license.json")
model.timestepper.start = "2001-01-01"
model.timestepper.end = "2001-01-31"
model.timestepper.delta = 5
rec = NumpyArrayNodeRecorder(model, model.nodes["supply1"])
model.run()
initial_amount = 200.0
# first day evenly apportions initial amount for each day of year
first_day = initial_amount / 365
assert_allclose(rec.data[0], first_day)
# second day does the same, minus yesterday and with less days remaining
remaining_days = 365 - 5
second_day = (initial_amount - first_day * 5) / remaining_days
assert_allclose(rec.data[1], second_day)
# actual amount is the same as maximum was taken
assert_allclose(first_day, second_day)
# third day nothing is taken (no demand), so licence is saved
assert_allclose(rec.data[2], 0.0)
# fourth day more can be supplied as we've built up a surplus
def test_sdc_recorder():
"""
Test the StorageDurationCurveRecorder
"""
model = load_model("timeseries3.json")
inpt = model.nodes['catchment1']
strg = model.nodes['reservoir1']
percentiles = np.linspace(20., 100., 5)
flow_rec = NumpyArrayNodeRecorder(model, inpt)
rec = StorageDurationCurveRecorder(model, strg, percentiles, temporal_agg_func="max", agg_func="min")
# test retrieval of recorder
assert model.recorders['storagedurationcurverecorder.reservoir1'] == rec
model.run()
# Manually calculate expected storage and percentiles
strg_volume = strg.initial_volume + np.cumsum(flow_rec.data - 23.0, axis=0)
strg_pciles = np.percentile(strg_volume, percentiles, axis=0)
assert_allclose(rec.sdc, strg_pciles)
assert_allclose(np.max(rec.sdc, axis=0), rec.values())
assert_allclose(np.min(np.max(rec.sdc, axis=0)), rec.aggregated_value())
assert rec.sdc.shape == (len(percentiles), len(model.scenarios.combinations))