Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_aggregated_node_max_flow_same_route(model):
"""Unusual case where the aggregated nodes are in the same route"""
A = Input(model, "A", max_flow=20.0, cost=1)
B = Input(model, "B", max_flow=20.0, cost=2)
C = Input(model, "C", max_flow=50.0, cost=0)
Z = Output(model, "Z", max_flow=100, cost=-10)
A.connect(B)
B.connect(Z)
C.connect(Z)
agg = AggregatedNode(model, "agg", [A, B])
agg.max_flow = 30.0
model.run()
assert_allclose(agg.flow, 30.0)
assert_allclose(A.flow + B.flow, 30.0)
def test_aggregated_node_max_flow_same_route(model):
"""Unusual case where the aggregated nodes are in the same route"""
A = Input(model, "A", max_flow=20.0, cost=1)
B = Input(model, "B", max_flow=20.0, cost=2)
C = Input(model, "C", max_flow=50.0, cost=0)
Z = Output(model, "Z", max_flow=100, cost=-10)
A.connect(B)
B.connect(Z)
C.connect(Z)
agg = AggregatedNode(model, "agg", [A, B])
agg.max_flow = 30.0
model.run()
assert_allclose(agg.flow, 30.0)
assert_allclose(A.flow + B.flow, 30.0)
def test_aggregated_node_min_flow_parameter(model):
"""Nodes constrained by the min_flow of their AggregatedNode"""
A = Input(model, "A", max_flow=20.0, cost=1)
B = Input(model, "B", max_flow=20.0, cost=100)
Z = Output(model, "Z", max_flow=100, cost=0)
A.connect(Z)
B.connect(Z)
agg = AggregatedNode(model, "agg", [A, B])
agg.min_flow = ConstantParameter(model, 15.0)
model.run()
assert_allclose(agg.flow, 15.0)
assert_allclose(A.flow, 15.0)
assert_allclose(B.flow, 0.0)
)
aqfer = KeatingAquifer(
model,
'keating',
num_streams,
num_additional_inputs,
stream_flow_levels,
transmissivity,
coefficient,
levels,
area=area,
storativity=storativity,
)
catchment = Input(model, 'catchment', max_flow=0)
stream = Output(model, 'stream', max_flow=np.inf, cost=0)
abstraction = Output(model, 'abstraction', max_flow=15, cost=-999)
catchment.connect(aqfer)
aqfer.connect(stream, from_slot=0)
aqfer.connect(abstraction, from_slot=1)
rec_level = NumpyArrayLevelRecorder(model, aqfer)
rec_volume = NumpyArrayStorageRecorder(model, aqfer)
rec_stream = NumpyArrayNodeRecorder(model, stream)
rec_abstraction = NumpyArrayNodeRecorder(model, abstraction)
model.check()
assert(len(aqfer.inputs) == (num_streams + num_additional_inputs))
def test_aggregated_node_two_factors_time_varying(model):
"""Nodes constrained by a time-varying ratio between flows (2 nodes)"""
model.timestepper.end = Timestamp("2016-01-03")
A = Input(model, "A")
B = Input(model, "B", max_flow=40.0)
Z = Output(model, "Z", max_flow=100, cost=-10)
agg = AggregatedNode(model, "agg", [A, B])
agg.factors = [0.5, 0.5]
assert_allclose(agg.factors, [0.5, 0.5])
A.connect(Z)
B.connect(Z)
model.setup()
model.step()
assert_allclose(agg.flow, 80.0)
assert_allclose(A.flow, 40.0)
assert_allclose(B.flow, 40.0)
def test_me():
model = Model(
parameters={
'timestamp_start': pandas.to_datetime('1888-01-01'),
'timestamp_finish': pandas.to_datetime('1888-01-05'),
'timestep': datetime.timedelta(1),
},
solver='cyglpk'
)
supply1 = Input(model, 'supply1')
supply1.max_flow = 3.0
supply1.cost = 10
supply1.recorder = NumpyArrayRecorder(5)
reservoir1 = Storage(model, name='reservoir1')
reservoir1.min_volume = 0.0
reservoir1.max_volume = 100.0
reservoir1._volume = 16.0
reservoir1.cost = 5
reservoir1.recorder = NumpyArrayRecorder(5)
demand1 = Output(model, 'demand1')
demand1.max_flow = 5.0
demand1.cost = -100
demand1.recorder = NumpyArrayRecorder(5)
def test_aggregated_node_max_flow_same_route(model):
"""Unusual case where the aggregated nodes are in the same route"""
A = Input(model, "A", max_flow=20.0, cost=1)
B = Input(model, "B", max_flow=20.0, cost=2)
C = Input(model, "C", max_flow=50.0, cost=0)
Z = Output(model, "Z", max_flow=100, cost=-10)
A.connect(B)
B.connect(Z)
C.connect(Z)
agg = AggregatedNode(model, "agg", [A, B])
agg.max_flow = 30.0
model.run()
assert_allclose(agg.flow, 30.0)
assert_allclose(A.flow + B.flow, 30.0)
def test_daily_profile_leap_day(model):
"""Test behaviour of daily profile parameter for leap years
"""
inpt = Input(model, "input")
otpt = Output(model, "otpt", max_flow=None, cost=-999)
inpt.connect(otpt)
inpt.max_flow = DailyProfileParameter(model, np.arange(0, 366, dtype=np.float64))
# non-leap year
model.timestepper.start = pd.to_datetime("2015-01-01")
model.timestepper.end = pd.to_datetime("2015-12-31")
model.run()
assert_allclose(inpt.flow, 365) # NOT 364
# leap year
model.timestepper.start = pd.to_datetime("2016-01-01")
model.timestepper.end = pd.to_datetime("2016-12-31")
model.run()
assert_allclose(inpt.flow, 365)
def simple_storage_model(request):
"""
Make a simple model with a single Input, Storage and Output.
Input -> Storage -> Output
"""
model = pywr.core.Model(
start=pandas.to_datetime('2016-01-01'),
end=pandas.to_datetime('2016-01-05'),
timestep=datetime.timedelta(1),
)
inpt = Input(model, name="Input", max_flow=5.0, cost=-1)
res = Storage(model, name="Storage", num_outputs=1, num_inputs=1, max_volume=20, initial_volume=10)
otpt = Output(model, name="Output", max_flow=8, cost=-999)
inpt.connect(res)
res.connect(otpt)
return model