Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
f1 = 0.0
x2 = x1 + m.leak_delta
f2 = node.leak_discharge_coeff*node.leak_area*(2.0*9.81*x2)**0.5
df1 = m.leak_slope
df2 = 0.5*node.leak_discharge_coeff*node.leak_area*(2.0*9.81)**0.5*x2**(-0.5)
a, b, c, d = cubic_spline(x1, x2, f1, f2, df1, df2)
if node_name in m.leak_poly_coeffs_a:
m.leak_poly_coeffs_a[node_name].value = a
m.leak_poly_coeffs_b[node_name].value = b
m.leak_poly_coeffs_c[node_name].value = c
m.leak_poly_coeffs_d[node_name].value = d
else:
m.leak_poly_coeffs_a[node_name] = aml.Param(a)
m.leak_poly_coeffs_b[node_name] = aml.Param(b)
m.leak_poly_coeffs_c[node_name] = aml.Param(c)
m.leak_poly_coeffs_d[node_name] = aml.Param(d)
updater.add(node, 'leak_discharge_coeff', leak_poly_coeffs_param.update)
updater.add(node, 'leak_area', leak_poly_coeffs_param.update)
f2 = 1.0
df1 = 0.5*((x1-pmin)/(pnom-pmin))**(-0.5)*1.0/(pnom-pmin)
df2 = m.pdd_slope
a2, b2, c2, d2 = cubic_spline(x1, x2, f1, f2, df1, df2)
if node_name in m.pdd_poly1_coeffs_a:
m.pdd_poly1_coeffs_a[node_name].value = a1
m.pdd_poly1_coeffs_b[node_name].value = b1
m.pdd_poly1_coeffs_c[node_name].value = c1
m.pdd_poly1_coeffs_d[node_name].value = d1
m.pdd_poly2_coeffs_a[node_name].value = a2
m.pdd_poly2_coeffs_b[node_name].value = b2
m.pdd_poly2_coeffs_c[node_name].value = c2
m.pdd_poly2_coeffs_d[node_name].value = d2
else:
m.pdd_poly1_coeffs_a[node_name] = aml.Param(a1)
m.pdd_poly1_coeffs_b[node_name] = aml.Param(b1)
m.pdd_poly1_coeffs_c[node_name] = aml.Param(c1)
m.pdd_poly1_coeffs_d[node_name] = aml.Param(d1)
m.pdd_poly2_coeffs_a[node_name] = aml.Param(a2)
m.pdd_poly2_coeffs_b[node_name] = aml.Param(b2)
m.pdd_poly2_coeffs_c[node_name] = aml.Param(c2)
m.pdd_poly2_coeffs_d[node_name] = aml.Param(d2)
updater.add(node, 'minimum_pressure', pdd_poly_coeffs_param.update)
updater.add(node, 'nominal_pressure', pdd_poly_coeffs_param.update)
updater: ModelUpdater
index_over: list of str
list of junction names
"""
if not hasattr(m, 'pmin'):
m.pmin = aml.ParamDict()
if index_over is None:
index_over = wn.junction_name_list
for node_name in index_over:
node = wn.get_node(node_name)
if node_name in m.pmin:
m.pmin[node_name].value = node.minimum_pressure
else:
m.pmin[node_name] = aml.Param(node.minimum_pressure)
updater.add(node, 'minimum_pressure', pmin_param.update)
updater: ModelUpdater
index_over: list of str
list of junction names
"""
if not hasattr(m, 'elevation'):
m.elevation = aml.ParamDict()
if index_over is None:
index_over = wn.junction_name_list
for node_name in index_over:
node = wn.get_node(node_name)
if node_name in m.elevation:
m.elevation[node_name].value = node.elevation
else:
m.elevation[node_name] = aml.Param(node.elevation)
updater.add(node, 'elevation', elevation_param.update)
def source_head_param(m, wn):
"""
Add a head param to the model
Parameters
----------
m: wntr.aml.aml.aml.Model
wn: wntr.network.model.WaterNetworkModel
"""
if not hasattr(m, 'source_head'):
m.source_head = aml.ParamDict()
for node_name, node in wn.tanks():
m.source_head[node_name] = aml.Param(node.head)
for node_name, node in wn.reservoirs():
m.source_head[node_name] = aml.Param(node.head_timeseries.at(wn.sim_time))
else:
for node_name, node in wn.tanks():
m.source_head[node_name].value = node.head
for node_name, node in wn.reservoirs():
m.source_head[node_name].value = node.head_timeseries.at(wn.sim_time)
index_over: list of str
list of pipe names
"""
if not hasattr(m, 'minor_loss'):
m.minor_loss = aml.ParamDict()
if index_over is None:
index_over = wn.pipe_name_list + wn.valve_name_list
for link_name in index_over:
link = wn.get_link(link_name)
value = 8.0 * link.minor_loss / (9.81 * math.pi**2 * link.diameter**4)
if link_name in m.minor_loss:
m.minor_loss[link_name].value = value
else:
m.minor_loss[link_name] = aml.Param(value)
updater.add(link, 'minor_loss', minor_loss_param.update)
updater.add(link, 'diameter', minor_loss_param.update)
node = wn.get_node(node_name)
x1 = 0.0
f1 = 0.0
x2 = x1 + m.leak_delta
f2 = node.leak_discharge_coeff*node.leak_area*(2.0*9.81*x2)**0.5
df1 = m.leak_slope
df2 = 0.5*node.leak_discharge_coeff*node.leak_area*(2.0*9.81)**0.5*x2**(-0.5)
a, b, c, d = cubic_spline(x1, x2, f1, f2, df1, df2)
if node_name in m.leak_poly_coeffs_a:
m.leak_poly_coeffs_a[node_name].value = a
m.leak_poly_coeffs_b[node_name].value = b
m.leak_poly_coeffs_c[node_name].value = c
m.leak_poly_coeffs_d[node_name].value = d
else:
m.leak_poly_coeffs_a[node_name] = aml.Param(a)
m.leak_poly_coeffs_b[node_name] = aml.Param(b)
m.leak_poly_coeffs_c[node_name] = aml.Param(c)
m.leak_poly_coeffs_d[node_name] = aml.Param(d)
updater.add(node, 'leak_discharge_coeff', leak_poly_coeffs_param.update)
updater.add(node, 'leak_area', leak_poly_coeffs_param.update)
x1 = 0.0
f1 = 0.0
x2 = x1 + m.leak_delta
f2 = node.leak_discharge_coeff*node.leak_area*(2.0*9.81*x2)**0.5
df1 = m.leak_slope
df2 = 0.5*node.leak_discharge_coeff*node.leak_area*(2.0*9.81)**0.5*x2**(-0.5)
a, b, c, d = cubic_spline(x1, x2, f1, f2, df1, df2)
if node_name in m.leak_poly_coeffs_a:
m.leak_poly_coeffs_a[node_name].value = a
m.leak_poly_coeffs_b[node_name].value = b
m.leak_poly_coeffs_c[node_name].value = c
m.leak_poly_coeffs_d[node_name].value = d
else:
m.leak_poly_coeffs_a[node_name] = aml.Param(a)
m.leak_poly_coeffs_b[node_name] = aml.Param(b)
m.leak_poly_coeffs_c[node_name] = aml.Param(c)
m.leak_poly_coeffs_d[node_name] = aml.Param(d)
updater.add(node, 'leak_discharge_coeff', leak_poly_coeffs_param.update)
updater.add(node, 'leak_area', leak_poly_coeffs_param.update)
def expected_demand_param(m, wn):
"""
Add a demand parameter to the model
Parameters
----------
m: wntr.aml.aml.aml.Model
wn: wntr.network.model.WaterNetworkModel
"""
demand_multiplier = wn.options.hydraulic.demand_multiplier
if not hasattr(m, 'expected_demand'):
m.expected_demand = aml.ParamDict()
for node_name, node in wn.junctions():
m.expected_demand[node_name] = aml.Param(node.demand_timeseries_list.at(wn.sim_time, multiplier=demand_multiplier))
else:
for node_name, node in wn.junctions():
m.expected_demand[node_name].value = node.demand_timeseries_list.at(wn.sim_time, multiplier=demand_multiplier)
m.pdd_poly1_coeffs_b[node_name].value = b1
m.pdd_poly1_coeffs_c[node_name].value = c1
m.pdd_poly1_coeffs_d[node_name].value = d1
m.pdd_poly2_coeffs_a[node_name].value = a2
m.pdd_poly2_coeffs_b[node_name].value = b2
m.pdd_poly2_coeffs_c[node_name].value = c2
m.pdd_poly2_coeffs_d[node_name].value = d2
else:
m.pdd_poly1_coeffs_a[node_name] = aml.Param(a1)
m.pdd_poly1_coeffs_b[node_name] = aml.Param(b1)
m.pdd_poly1_coeffs_c[node_name] = aml.Param(c1)
m.pdd_poly1_coeffs_d[node_name] = aml.Param(d1)
m.pdd_poly2_coeffs_a[node_name] = aml.Param(a2)
m.pdd_poly2_coeffs_b[node_name] = aml.Param(b2)
m.pdd_poly2_coeffs_c[node_name] = aml.Param(c2)
m.pdd_poly2_coeffs_d[node_name] = aml.Param(d2)
updater.add(node, 'minimum_pressure', pdd_poly_coeffs_param.update)
updater.add(node, 'nominal_pressure', pdd_poly_coeffs_param.update)