Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def assert_model(model, expected_node_results):
__tracebackhide__ = True
model.step()
for node in model.nodes:
if node.name in expected_node_results:
if isinstance(node, pywr.core.BaseNode):
assert_allclose(expected_node_results[node.name], node.flow, atol=1e-7)
elif isinstance(node, pywr.core.Storage):
assert_allclose(expected_node_results[node.name], node.volume, atol=1e-7)
inpt.connect(model.nodes["Output 1"])
inpt.connect(model.nodes["Output 2"])
# limit the flow of the new node using a control curve on the aggregate storage
curves = [0.5] # 50%
values = [0, 5]
inpt.max_flow = ControlCurveParameter(model, model.nodes["Total Storage"], curves, values)
# initial storage is > 50% so flow == 0
model.step()
np.testing.assert_allclose(inpt.flow, 0.0)
# set initial storage to < 50%
storages = [node for node in model.nodes if isinstance(node, Storage)]
for node, value in zip(storages, [0.6, 0.1, 0.1]):
if isinstance(node, Storage):
node.initial_volume = node.max_volume * value
# now below the control curve, so flow is allowed
model.reset()
model.step()
np.testing.assert_allclose(inpt.flow, 5.0)
| ^
v |
Storage
"""
in_flow, out_flow, out_benefit, strg_benefit, current_volume, min_volume = request.param
max_strg_out = 10.0
max_volume = 10.0
model = pywr.core.Model()
inpt = pywr.core.Input(model, name="Input", min_flow=in_flow, max_flow=in_flow)
lnk = pywr.core.Link(model, name="Link", cost=0.1)
inpt.connect(lnk)
otpt = pywr.core.Output(model, name="Output", min_flow=out_flow, cost=-out_benefit)
lnk.connect(otpt)
strg = pywr.core.Storage(model, name="Storage", max_volume=max_volume, min_volume=min_volume,
initial_volume=current_volume, cost=-strg_benefit)
strg.connect(otpt)
lnk.connect(strg)
avail_volume = max(current_volume - min_volume, 0.0)
avail_refill = max_volume - current_volume
expected_sent = in_flow+min(max_strg_out, avail_volume) if out_benefit > strg_benefit else max(out_flow, in_flow-avail_refill)
expected_node_results = {
"Input": in_flow,
"Link": in_flow,
"Output": expected_sent,
"Storage Output": 0.0,
"Storage Input": min(max_strg_out, avail_volume) if out_benefit > 1.0 else 0.0,
"Storage": min_volume if out_benefit > strg_benefit else max_volume,
}
inpt = Input(model, "Input 3", cost=-1000)
inpt.connect(model.nodes["Output 0"])
inpt.connect(model.nodes["Output 1"])
inpt.connect(model.nodes["Output 2"])
# limit the flow of the new node using a control curve on the aggregate storage
curves = [0.5] # 50%
values = [0, 5]
inpt.max_flow = ControlCurveParameter(model, model.nodes["Total Storage"], curves, values)
# initial storage is > 50% so flow == 0
model.step()
np.testing.assert_allclose(inpt.flow, 0.0)
# set initial storage to < 50%
storages = [node for node in model.nodes if isinstance(node, Storage)]
for node, value in zip(storages, [0.6, 0.1, 0.1]):
if isinstance(node, Storage):
node.initial_volume = node.max_volume * value
# now below the control curve, so flow is allowed
model.reset()
model.step()
np.testing.assert_allclose(inpt.flow, 5.0)
def test_me():
model = Model(
parameters={
'timestamp_start': pandas.to_datetime('1888-01-01'),
'timestamp_finish': pandas.to_datetime('1888-01-05'),
'timestep': datetime.timedelta(1),
},
solver='cyglpk'
)
supply1 = Input(model, 'supply1')
supply1.max_flow = 3.0
supply1.cost = 10
supply1.recorder = NumpyArrayRecorder(5)
reservoir1 = Storage(model, name='reservoir1')
reservoir1.min_volume = 0.0
reservoir1.max_volume = 100.0
reservoir1._volume = 16.0
reservoir1.cost = 5
reservoir1.recorder = NumpyArrayRecorder(5)
demand1 = Output(model, 'demand1')
demand1.max_flow = 5.0
demand1.cost = -100
demand1.recorder = NumpyArrayRecorder(5)
supply1.connect(reservoir1)
reservoir1.connect(demand1)
#t0 = time.time()
model.run()
def test_new_storage():
"""Test new-style storage node with multiple inputs"""
model = pywr.core.Model(
start=pandas.to_datetime('1888-01-01'),
end=pandas.to_datetime('1888-01-01'),
timestep=datetime.timedelta(1)
)
supply1 = pywr.core.Input(model, 'supply1')
splitter = pywr.core.Storage(model, 'splitter', num_outputs=1, num_inputs=2, max_volume=10, initial_volume=5)
demand1 = pywr.core.Output(model, 'demand1')
demand2 = pywr.core.Output(model, 'demand2')
supply1.connect(splitter)
splitter.connect(demand1, from_slot=0)
splitter.connect(demand2, from_slot=1)
supply1.max_flow = 45.0
demand1.max_flow = 20
demand2.max_flow = 40
demand1.cost = -150
demand2.cost = -100
def test_scaled_profile_nested_load(model):
""" Test `ScaledProfileParameter` loading with `AggregatedParameter` """
model.timestepper.delta = 15
s = Storage(model, 'Storage', max_volume=100.0, initial_volume=50.0, num_outputs=0)
d = Output(model, 'Link')
data = {
'type': 'scaledprofile',
'scale': 50.0,
'profile': {
'type': 'aggregated',
'agg_func': 'product',
'parameters': [
{
'type': 'monthlyprofile',
'values': [0.5]*12
},
{
'type': 'constant',
'value': 1.5,
}
from ..core import Storage
from ..parameters import InterpolatedVolumeParameter
from ..parameters.groundwater import KeatingStreamFlowParameter
import numbers
from scipy.interpolate import interp1d
class KeatingAquifer(Storage):
def __init__(self, model, name,
num_streams, num_additional_inputs,
stream_flow_levels, transmissivity, coefficient,
levels, volumes=None, area=None, storativity=None,
**kwargs):
"""Storage node with one or more Keating outflows
Parameters
----------
model : pywr.core.Model
The Pywr Model.
name : string
A unique name for the node in the model.
num_streams : integer
Number of keating outflows.
num_additional_inputs : integer