Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_hysteresis(self, simple_linear_model, minimum_length):
""" Test the minimum_event_length keyword of EventRecorder """
m = simple_linear_model
flow = np.zeros(len(m.timestepper))
flow[:10] = [0, 0, 10, 0, 10, 10, 10, 0, 0, 0]
# With min event length of 1. There are two events with lengths (1, 3)
# |---| |---------|
# With min event length up to 4. There is one event with length 3
# |---------|
# Min event length >= 4 gives no events
inpt = m.nodes['Input']
inpt.max_flow = ArrayIndexedParameter(m, flow)
# Force through whatever flow Input can provide
otpt = m.nodes['Output']
otpt.max_flow = 100
otpt.cost = -100
# Create the trigger using a threhsold parameter
trigger = NodeThresholdRecorder(m, otpt, 4.0, predicate='>')
evt_rec = EventRecorder(m, trigger, minimum_event_length=minimum_length)
m.run()
if minimum_length == 1:
assert len(evt_rec.events) == 2
assert_equal([1, 3], [e.duration for e in evt_rec.events])
elif minimum_length < 4:
def test_array_deficit_recoder(self, simple_linear_model):
"""Test `NumpyArrayNodeDeficitRecorder` """
model = simple_linear_model
model.timestepper.delta = 1
otpt = model.nodes['Output']
inflow = np.arange(365) * 0.1
demand = np.ones_like(inflow) * 30.0
model.nodes['Input'].max_flow = ArrayIndexedParameter(model, inflow)
otpt.max_flow = ArrayIndexedParameter(model, demand)
otpt.cost = -2.0
expected_supply = np.minimum(inflow, demand)
expected_deficit = demand - expected_supply
rec = NumpyArrayNodeDeficitRecorder(model, otpt)
model.run()
assert rec.data.shape == (365, 1)
np.testing.assert_allclose(expected_deficit[:, np.newaxis], rec.data)
df = rec.to_dataframe()
assert df.shape == (365, 1)
np.testing.assert_allclose(expected_deficit[:, np.newaxis], df.values)
def test_interpolated_parameter(simple_linear_model):
model = simple_linear_model
model.timestepper.start = "1920-01-01"
model.timestepper.end = "1920-01-12"
p1 = ArrayIndexedParameter(model, [0,1,2,3,4,5,6,7,8,9,10,11])
p2 = InterpolatedParameter(model, p1, [0, 5, 10, 11], [0, 5*2, 10*3, 2])
@assert_rec(model, p2)
def expected_func(timestep, scenario_index):
values = [0, 2, 4, 6, 8, 10, 14, 18, 22, 26, 30, 2]
return values[timestep.index]
model.run()
def test_array_curtailment_ratio_recoder(self, simple_linear_model):
"""Test `NumpyArrayNodeCurtailmentRatioRecorder` """
model = simple_linear_model
model.timestepper.delta = 1
otpt = model.nodes['Output']
inflow = np.arange(365) * 0.1
demand = np.ones_like(inflow) * 30.0
model.nodes['Input'].max_flow = ArrayIndexedParameter(model, inflow)
otpt.max_flow = ArrayIndexedParameter(model, demand)
otpt.cost = -2.0
expected_supply = np.minimum(inflow, demand)
expected_curtailment_ratio = 1 - expected_supply / demand
rec = NumpyArrayNodeCurtailmentRatioRecorder(model, otpt)
model.run()
assert rec.data.shape == (365, 1)
np.testing.assert_allclose(expected_curtailment_ratio[:, np.newaxis], rec.data)
df = rec.to_dataframe()
assert df.shape == (365, 1)
np.testing.assert_allclose(expected_curtailment_ratio[:, np.newaxis], df.values)
def test_array_curtailment_ratio_recoder(self, simple_linear_model):
"""Test `NumpyArrayNodeCurtailmentRatioRecorder` """
model = simple_linear_model
model.timestepper.delta = 1
otpt = model.nodes['Output']
inflow = np.arange(365) * 0.1
demand = np.ones_like(inflow) * 30.0
model.nodes['Input'].max_flow = ArrayIndexedParameter(model, inflow)
otpt.max_flow = ArrayIndexedParameter(model, demand)
otpt.cost = -2.0
expected_supply = np.minimum(inflow, demand)
expected_curtailment_ratio = 1 - expected_supply / demand
rec = NumpyArrayNodeCurtailmentRatioRecorder(model, otpt)
model.run()
assert rec.data.shape == (365, 1)
np.testing.assert_allclose(expected_curtailment_ratio[:, np.newaxis], rec.data)
df = rec.to_dataframe()
assert df.shape == (365, 1)
np.testing.assert_allclose(expected_curtailment_ratio[:, np.newaxis], df.values)
def test_parameter_array_indexed(simple_linear_model):
"""
Test ArrayIndexedParameter
"""
model = simple_linear_model
A = np.arange(len(model.timestepper), dtype=np.float64)
p = ArrayIndexedParameter(model, A)
model.setup()
# scenario indices (not used for this test)
si = ScenarioIndex(0, np.array([0], dtype=np.int32))
for v, ts in zip(A, model.timestepper):
np.testing.assert_allclose(p.value(ts, si), v)
# Now check that IndexError is raised if an out of bounds Timestep is given.
ts = Timestep(pd.Period('2016-01-01', freq='1D'), 366, 1)
with pytest.raises(IndexError):
p.value(ts, si)
def test_array_supplied_ratio_recoder(self, simple_linear_model):
"""Test `NumpyArrayNodeSuppliedRatioRecorder` """
model = simple_linear_model
model.timestepper.delta = 1
otpt = model.nodes['Output']
inflow = np.arange(365) * 0.1
demand = np.ones_like(inflow) * 30.0
model.nodes['Input'].max_flow = ArrayIndexedParameter(model, inflow)
otpt.max_flow = ArrayIndexedParameter(model, demand)
otpt.cost = -2.0
expected_supply = np.minimum(inflow, demand)
expected_ratio = expected_supply / demand
rec = NumpyArrayNodeSuppliedRatioRecorder(model, otpt)
model.run()
assert rec.data.shape == (365, 1)
np.testing.assert_allclose(expected_ratio[:, np.newaxis], rec.data)
df = rec.to_dataframe()
assert df.shape == (365, 1)
np.testing.assert_allclose(expected_ratio[:, np.newaxis], df.values)
def test_load(self, simple_linear_model):
model = simple_linear_model
model.timestepper.start = "1920-01-01"
model.timestepper.end = "1920-01-12"
p1 = ArrayIndexedParameter(model, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], name='p1')
p2 = {
'type': 'interpolatedquadrature',
'upper_parameter': 'p1',
'x': [0, 5, 10, 11],
'y': [0, 5 * 2, 10 * 3, 2]
}
p2 = load_parameter(model, p2)
@assert_rec(model, p2)
def expected_func(timestep, scenario_index):
i = timestep.index
if i < 6:
value = 2 * i ** 2 / 2
elif i < 11:
def test_calc(self, simple_linear_model, lower_interval):
model = simple_linear_model
model.timestepper.start = "1920-01-01"
model.timestepper.end = "1920-01-12"
b = ArrayIndexedParameter(model, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])
a = None
if lower_interval is not None:
a = ConstantParameter(model, lower_interval)
p2 = InterpolatedQuadratureParameter(model, b, [-5, 0, 5, 10, 11], [0, 0, 5 * 2, 10 * 3, 2],
lower_parameter=a)
def area(i):
if i < 0:
value = 0
elif i < 6:
value = 2*i**2 / 2
elif i < 11:
value = 25 + 4*(i - 5)**2 / 2 + (i - 5) * 10
else:
value = 25 + 50 + 50 + 28 / 2 + 2
def test_load(self, simple_linear_model):
"""Test loading from JSON."""
model = simple_linear_model
model.timestepper.start = "1920-01-01"
model.timestepper.end = "1920-01-12"
x = ArrayIndexedParameter(model, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], name='x')
p2 = {
'type': 'piecewiseintegralparameter',
'parameter': 'x',
'x': self.X,
'y': self.Y,
}
p2 = load_parameter(model, p2)
@assert_rec(model, p2)
def expected_func(timestep, scenario_index):
i = timestep.index
return self.area(i)
model.run()