Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
S = supply_amplitude
w = frequency
def supply_func(parent, index):
t = parent.model.timestamp.timetuple().tm_yday
return S*np.cos(t*w)+S
supply = pywr.core.Supply(model, name='supply', max_flow=supply_func)
demand = pywr.core.Demand(model, name='demand', demand=demand)
res = pywr.core.Reservoir(model, name='reservoir')
res.properties['max_volume'] = pywr.parameters.ParameterConstant(1e6)
res.properties['current_volume'] = pywr.core.Variable(initial_volume)
supply_res_link = pywr.core.Link(model, name='link1')
res_demand_link = pywr.core.Link(model, name='link2')
supply.connect(supply_res_link)
supply_res_link.connect(res)
res.connect(res_demand_link)
res_demand_link.connect(demand)
return model
def linear_model_with_storage(request):
"""
Make a simple model with a single Input and Output and an offline Storage Node
Input -> Link -> Output
| ^
v |
Storage
"""
in_flow, out_flow, out_benefit, strg_benefit, current_volume, min_volume = request.param
max_strg_out = 10.0
max_volume = 10.0
model = pywr.core.Model()
inpt = pywr.core.Input(model, name="Input", min_flow=in_flow, max_flow=in_flow)
lnk = pywr.core.Link(model, name="Link", cost=0.1)
inpt.connect(lnk)
otpt = pywr.core.Output(model, name="Output", min_flow=out_flow, cost=-out_benefit)
lnk.connect(otpt)
strg = pywr.core.Storage(model, name="Storage", max_volume=max_volume, min_volume=min_volume,
initial_volume=current_volume, cost=-strg_benefit)
strg.connect(otpt)
lnk.connect(strg)
avail_volume = max(current_volume - min_volume, 0.0)
avail_refill = max_volume - current_volume
expected_sent = in_flow+min(max_strg_out, avail_volume) if out_benefit > strg_benefit else max(out_flow, in_flow-avail_refill)
expected_node_results = {
"Input": in_flow,
"Link": in_flow,
model = pywr.core.Model()
S = supply_amplitude
w = frequency
def supply_func(parent, index):
t = parent.model.timestamp.timetuple().tm_yday
return S*np.cos(t*w)+S
supply = pywr.core.Supply(model, name='supply', max_flow=supply_func)
demand = pywr.core.Demand(model, name='demand', demand=demand)
res = pywr.core.Reservoir(model, name='reservoir')
res.properties['max_volume'] = pywr.parameters.ParameterConstant(1e6)
res.properties['current_volume'] = pywr.core.Variable(initial_volume)
supply_res_link = pywr.core.Link(model, name='link1')
res_demand_link = pywr.core.Link(model, name='link2')
supply.connect(supply_res_link)
supply_res_link.connect(res)
res.connect(res_demand_link)
res_demand_link.connect(demand)
return model
def test_with_nonstorage(self, model):
""" Test usage on non-`Storage` node. """
# Now test if the parameter is used on a non storage node
m = model
s = m.nodes['Storage']
l = Link(m, 'Link')
cc = ConstantParameter(model, 0.8)
l.cost = ControlCurveParameter(model, s, cc, [10.0, 0.0])
@assert_rec(m, l.cost)
def expected_func(timestep, scenario_index):
v = s.initial_volume
if v >= 80.0:
expected = 10.0
else:
expected = 0.0
return expected
for initial_volume in (90, 70):
s.initial_volume = initial_volume
m.run()
"""
river_flow = 10.0
expected_node_results = {}
model = pywr.core.Model()
# Create grid network
grid_inpt = pywr.core.Input(model, name="Input", domain='grid',)
grid_lnk = pywr.core.Link(model, name="Link", cost=1.0, domain='grid')
grid_inpt.connect(grid_lnk)
grid_otpt = pywr.core.Output(model, name="Output", max_flow=50.0, cost=-10.0, domain='grid')
grid_lnk.connect(grid_otpt)
# Create river network
for i in range(2):
river_inpt = pywr.core.Input(model, name="Catchment {}".format(i), max_flow=river_flow, domain='river')
river_lnk = pywr.core.Link(model, name="Reach {}".format(i), domain='river')
river_inpt.connect(river_lnk)
river_otpt = pywr.core.Output(model, name="Abstraction {}".format(i), domain='river', cost=0.0)
river_lnk.connect(river_otpt)
# Connect grid to river
river_otpt.connect(grid_inpt)
expected_node_results.update({
"Catchment {}".format(i): river_flow,
"Reach {}".format(i): river_flow,
"Abstraction {}".format(i): river_flow
})
expected_node_results.update({
"Input": river_flow*2,
"Link": river_flow*2,
"Output": river_flow*2,
power_benefit = 10.0 # £/GWh
river_domain = pywr.core.Domain('river')
grid_domain = pywr.core.Domain('grid')
model = pywr.core.Model()
# Create river network
river_inpt = pywr.core.Input(model, name="Catchment", max_flow=river_flow, domain=river_domain)
river_lnk = pywr.core.Link(model, name="Reach", domain=river_domain)
river_inpt.connect(river_lnk)
river_otpt = pywr.core.Output(model, name="Abstraction", domain=river_domain, cost=0.0)
river_lnk.connect(river_otpt)
# Create grid network
grid_inpt = pywr.core.Input(model, name="Power Plant", max_flow=power_plant_cap, domain=grid_domain,
conversion_factor=1/power_plant_flow_req)
grid_lnk = pywr.core.Link(model, name="Transmission", cost=1.0, domain=grid_domain)
grid_inpt.connect(grid_lnk)
grid_otpt = pywr.core.Output(model, name="Substation", max_flow=power_demand,
cost=-power_benefit, domain=grid_domain)
grid_lnk.connect(grid_otpt)
# Connect grid to river
river_otpt.connect(grid_inpt)
expected_requested = {'river': 0.0, 'grid': 0.0}
expected_sent = {'river': power_demand*power_plant_flow_req, 'grid': power_demand}
expected_node_results = {
"Catchment": power_demand*power_plant_flow_req,
"Reach": power_demand*power_plant_flow_req,
"Abstraction": power_demand*power_plant_flow_req,
"Power Plant": power_demand,
"Transmission": power_demand,
def test_with_nonstorage_load(self, model):
""" Test load from dict with 'storage_node' key. """
m = model
s = m.nodes['Storage']
l = Link(m, 'Link')
data = {
"type": "controlcurve",
"control_curve": 0.8,
"values": [10.0, 0.0],
"storage_node": "Storage"
}
l.cost = p = load_parameter(model, data)
assert isinstance(p, ControlCurveParameter)
@assert_rec(m, l.cost)
def expected_func(timestep, scenario_index):
v = s.initial_volume
if v >= 80.0:
expected = 10.0
the reservoir. The reservoir is already at it's maximum volume, so the
water must go *somewhere*. The compensation flow has the most negative cost,
so that is satisfied first. Once that is full, the demand is supplied.
Finally, any surplus is forced into the spill despite the cost.
Catchment -> Reservoir -> Demand
|--> Spill --|
|--> Compensation --|
|--> Terminator
"""
model = pywr.core.Model()
catchment = pywr.core.Input(model, name="Input", min_flow=10.0, max_flow=10.0, cost=1)
reservoir = pywr.core.Storage(model, name="Storage", max_volume=100, initial_volume=100.0)
spill = pywr.core.Link(model, name="Spill", cost=1.0)
compensation = pywr.core.Link(model, name="Compensation", max_flow=3.0, cost=-999)
terminator = pywr.core.Output(model, name="Terminator", cost=-1.0)
demand = pywr.core.Output(model, name="Demand", max_flow=5.0, cost=-500)
catchment.connect(reservoir)
reservoir.connect(spill)
reservoir.connect(compensation)
reservoir.connect(demand)
spill.connect(terminator)
compensation.connect(terminator)
model.check()
model.run()
assert_allclose(catchment.flow[0], 10.0, atol=1e-7)
assert_allclose(demand.flow[0], 5.0, atol=1e-7)
assert_allclose(compensation.flow[0], 3.0, atol=1e-7)
assert_allclose(spill.flow[0], 2.0, atol=1e-7)
with a total flow equal to the sum of their respective parts.
Input -> Link -> Output : river
| across the domain
Input -> Link -> Output : grid
| across the domain
Input -> Link -> Output : river
"""
river_flow = 10.0
expected_node_results = {}
model = pywr.core.Model()
# Create grid network
grid_inpt = pywr.core.Input(model, name="Input", domain='grid',)
grid_lnk = pywr.core.Link(model, name="Link", cost=1.0, domain='grid')
grid_inpt.connect(grid_lnk)
grid_otpt = pywr.core.Output(model, name="Output", max_flow=50.0, cost=-10.0, domain='grid')
grid_lnk.connect(grid_otpt)
# Create river network
for i in range(2):
river_inpt = pywr.core.Input(model, name="Catchment {}".format(i), max_flow=river_flow, domain='river')
river_lnk = pywr.core.Link(model, name="Reach {}".format(i), domain='river')
river_inpt.connect(river_lnk)
river_otpt = pywr.core.Output(model, name="Abstraction {}".format(i), domain='river', cost=0.0)
river_lnk.connect(river_otpt)
# Connect grid to river
river_otpt.connect(grid_inpt)
expected_node_results.update({
"Catchment {}".format(i): river_flow,
"Reach {}".format(i): river_flow,