Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_mean_flow_recorder():
model = Model()
model.timestepper.start = pandas.to_datetime("2016-01-01")
model.timestepper.end = pandas.to_datetime("2016-01-04")
inpt = Input(model, "input")
otpt = Output(model, "output")
inpt.connect(otpt)
rec_flow = NumpyArrayNodeRecorder(model, inpt)
rec_mean = RollingMeanFlowNodeRecorder(model, node=inpt, timesteps=3)
scenario = Scenario(model, "dummy", size=2)
inpt.max_flow = inpt.min_flow = FunctionParameter(model, inpt, lambda model, t, si: 2 + t.index)
model.run()
expected = [
2.0,
(2.0 + 3.0) / 2,
(2.0 + 3.0 + 4.0) / 3,
(3.0 + 4.0 + 5.0) / 3, # zeroth day forgotten
]
is above or below a particular threshold.
(flow = 8.0) (max_flow = 10.0)
Catchment -> River -> DemandCentre
| ^
(max_flow = 2.0) v | (max_flow = 2.0)
Reservoir
"""
in_flow = 8
model = pywr.core.Model()
catchment = river.Catchment(model, name="Catchment", flow=in_flow)
lnk = river.River(model, name="River")
catchment.connect(lnk)
demand = pywr.core.Output(model, name="Demand", cost=-10.0, max_flow=10)
lnk.connect(demand)
from pywr.parameters import ConstantParameter
control_curve = ConstantParameter(model, 0.8)
reservoir = river.Reservoir(model, name="Reservoir", max_volume=10, cost=-20, above_curve_cost=0.0,
control_curve=control_curve, initial_volume=10)
reservoir.inputs[0].max_flow = 2.0
reservoir.outputs[0].max_flow = 2.0
lnk.connect(reservoir)
reservoir.connect(demand)
model.setup()
model.step()
# Reservoir is currently above control curve. 2 should be taken from the
# reservoir
assert(reservoir.volume == 8)
def test_aggregated_node_three_factors(model):
"""Nodes constrained by a fixed ratio between flows (3 nodes)"""
A = Input(model, "A")
B = Input(model, "B", max_flow=10.0)
C = Input(model, "C")
Z = Output(model, "Z", max_flow=100, cost=-10)
agg = AggregatedNode(model, "agg", [A, B, C])
agg.factors = [0.5, 1.0, 2.0]
assert_allclose(agg.factors, [0.5, 1.0, 2.0])
A.connect(Z)
B.connect(Z)
C.connect(Z)
model.run()
assert_allclose(agg.flow, 35.0)
assert_allclose(A.flow, 5.0)
assert_allclose(B.flow, 10.0)
assert_allclose(C.flow, 20.0)
def test_new_storage():
"""Test new-style storage node with multiple inputs"""
model = pywr.core.Model(
start=pandas.to_datetime('1888-01-01'),
end=pandas.to_datetime('1888-01-01'),
timestep=datetime.timedelta(1)
)
supply1 = pywr.core.Input(model, 'supply1')
splitter = pywr.core.Storage(model, 'splitter', num_outputs=1, num_inputs=2, max_volume=10, initial_volume=5)
demand1 = pywr.core.Output(model, 'demand1')
demand2 = pywr.core.Output(model, 'demand2')
supply1.connect(splitter)
splitter.connect(demand1, from_slot=0)
splitter.connect(demand2, from_slot=1)
supply1.max_flow = 45.0
demand1.max_flow = 20
demand2.max_flow = 40
demand1.cost = -150
demand2.cost = -100
model.run()
assert_allclose(supply1.flow, [45], atol=1e-7)
def test_scaled_profile_nested_load(model):
""" Test `ScaledProfileParameter` loading with `AggregatedParameter` """
model.timestepper.delta = 15
s = Storage(model, 'Storage', max_volume=100.0, initial_volume=50.0, num_outputs=0)
d = Output(model, 'Link')
data = {
'type': 'scaledprofile',
'scale': 50.0,
'profile': {
'type': 'aggregated',
'agg_func': 'product',
'parameters': [
{
'type': 'monthlyprofile',
'values': [0.5]*12
},
{
'type': 'constant',
'value': 1.5,
}
]
def simple_model(model):
inpt = Input(model, "input")
otpt = Output(model, "output", max_flow=20, cost=-1000)
inpt.connect(otpt)
return model
def simple_piecewise_model(request):
"""
Make a simple model with a single Input and Output and PiecewiseLink
Input -> PiecewiseLink -> Output
"""
in_flow, out_flow, benefit = request.param
min_flow_req = 5.0
model = pywr.core.Model()
inpt = pywr.core.Input(model, name="Input", max_flow=in_flow)
lnk = pywr.core.PiecewiseLink(model, name="Link", cost=[-1.0, 0.0], max_flow=[min_flow_req, None])
inpt.connect(lnk)
otpt = pywr.core.Output(model, name="Output", min_flow=out_flow, cost=-benefit)
lnk.connect(otpt)
default = inpt.domain
expected_sent = in_flow if benefit > 1.0 else out_flow
expected_node_results = {
"Input": expected_sent,
"Link": expected_sent,
"Link Sublink 0": min(min_flow_req, expected_sent),
"Link Sublink 1": expected_sent - min(min_flow_req, expected_sent),
"Output": expected_sent,
}
return model, expected_node_results
Input -> Link -> Output : river
| across the domain
Input -> Link -> Output : grid
| across the domain
Input -> Link -> Output : river
"""
river_flow = 10.0
expected_node_results = {}
model = pywr.core.Model()
# Create grid network
grid_inpt = pywr.core.Input(model, name="Input", domain='grid',)
grid_lnk = pywr.core.Link(model, name="Link", cost=1.0, domain='grid')
grid_inpt.connect(grid_lnk)
grid_otpt = pywr.core.Output(model, name="Output", max_flow=50.0, cost=-10.0, domain='grid')
grid_lnk.connect(grid_otpt)
# Create river network
for i in range(2):
river_inpt = pywr.core.Input(model, name="Catchment {}".format(i), max_flow=river_flow, domain='river')
river_lnk = pywr.core.Link(model, name="Reach {}".format(i), domain='river')
river_inpt.connect(river_lnk)
river_otpt = pywr.core.Output(model, name="Abstraction {}".format(i), domain='river', cost=0.0)
river_lnk.connect(river_otpt)
# Connect grid to river
river_otpt.connect(grid_inpt)
expected_node_results.update({
"Catchment {}".format(i): river_flow,
"Reach {}".format(i): river_flow,
"Abstraction {}".format(i): river_flow
})
def test_aggregated_node_max_flow_same_route(model):
"""Unusual case where the aggregated nodes are in the same route"""
A = Input(model, "A", max_flow=20.0, cost=1)
B = Input(model, "B", max_flow=20.0, cost=2)
C = Input(model, "C", max_flow=50.0, cost=0)
Z = Output(model, "Z", max_flow=100, cost=-10)
A.connect(B)
B.connect(Z)
C.connect(Z)
agg = AggregatedNode(model, "agg", [A, B])
agg.max_flow = 30.0
model.run()
assert_allclose(agg.flow, 30.0)
assert_allclose(A.flow + B.flow, 30.0)
def simple_linear_inline_model(request):
"""
Make a simple model with a single Input and Output nodes inline of a route.
Input 0 -> Input 1 -> Link -> Output 0 -> Output 1
"""
model = pywr.core.Model()
inpt0 = pywr.core.Input(model, name="Input 0")
inpt1 = pywr.core.Input(model, name="Input 1")
inpt0.connect(inpt1)
lnk = pywr.core.Link(model, name="Link", cost=1.0)
inpt1.connect(lnk)
otpt0 = pywr.core.Output(model, name="Output 0")
lnk.connect(otpt0)
otpt1 = pywr.core.Output(model, name="Output 1")
otpt0.connect(otpt1)
return model