Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if current[6] == 'ABOVE':
oper = np.greater
elif current[6] == 'BELOW':
oper = np.less
else:
raise RuntimeError("The following control is not recognized: " + line)
# OKAY - we are adding in the elevation. This is A PROBLEM
# IN THE INP WRITER. Now that we know, we can fix it, but
# if this changes, it will affect multiple pieces, just an
# FYI.
if isinstance(node, wntr.network.Junction):
threshold = to_si(self.flow_units,
float(current[7]), HydParam.Pressure)# + node.elevation
control_obj = wntr.network.ConditionalControl((node, 'pressure'), oper, threshold, action_obj)
elif isinstance(node, wntr.network.Tank):
threshold = to_si(self.flow_units,
float(current[7]), HydParam.HydraulicHead)# + node.elevation
control_obj = wntr.network.ConditionalControl((node, 'level'), oper, threshold, action_obj)
else:
raise RuntimeError("The following control is not recognized: " + line)
control_name = ''
for i in range(len(current)-1):
control_name = control_name + '/' + current[i]
control_name = control_name + '/' + str(round(threshold, 2))
else:
if len(current) == 6: # at time
if ':' in current[5]:
run_at_time = int(_str_time_to_sec(current[5]))
else:
run_at_time = int(float(current[5])*3600)
control_obj = wntr.network.TimeControl(self.wn, run_at_time, 'SIM_TIME', False, action_obj)
control_name = ''
else:
if len(current) != 7:
raise RuntimeError('The [VALVES] section of an INP file must have 6 or 7 entries.')
valve_type = current[4].upper()
if valve_type in ['PRV', 'PSV', 'PBV']:
valve_set = to_si(self.flow_units, float(current[5]), HydParam.Pressure)
elif valve_type == 'FCV':
valve_set = to_si(self.flow_units, float(current[5]), HydParam.Flow)
elif valve_type == 'TCV':
valve_set = float(current[5])
elif valve_type == 'GPV':
curve_name = current[5]
curve_points = []
for point in self.curves[curve_name]:
x = to_si(self.flow_units, point[0], HydParam.Flow)
y = to_si(self.flow_units, point[1], HydParam.HeadLoss)
curve_points.append((x, y))
self.wn.add_curve(curve_name, 'HEADLOSS', curve_points)
valve_set = curve_name
else:
raise RuntimeError('VALVE type "%s" unrecognized' % valve_type)
self.wn.add_valve(current[0],
current[1],
current[2],
to_si(self.flow_units, float(current[3]), HydParam.PipeDiameter),
current[4].upper(),
float(current[6]),
valve_set)
current[1] = link_name # don't capitalize the link name
# Create the control action object
status = current[2].upper()
if status == 'OPEN' or status == 'OPENED' or status == 'CLOSED' or status == 'ACTIVE':
setting = LinkStatus[status].value
action_obj = wntr.network.ControlAction(link, 'status', setting)
else:
if isinstance(link, wntr.network.Pump):
action_obj = wntr.network.ControlAction(link, 'speed', float(current[2]))
elif isinstance(link, wntr.network.Valve):
if link.valve_type == 'PRV' or link.valve_type == 'PSV' or link.valve_type == 'PBV':
setting = to_si(self.flow_units, float(current[2]), HydParam.Pressure)
elif link.valve_type == 'FCV':
setting = to_si(self.flow_units, float(current[2]), HydParam.Flow)
elif link.valve_type == 'TCV':
setting = float(current[2])
elif link.valve_type == 'GPV':
setting = current[2]
else:
raise ValueError('Unrecognized valve type {0} while parsing control {1}'.format(link.valve_type, line))
action_obj = wntr.network.ControlAction(link, 'setting', setting)
else:
raise RuntimeError(('Links of type {0} can only have controls that change\n'.format(type(link))+
'the link status. Control: {0}'.format(line)))
# Create the control object
control_name = ''
if 'TIME' not in current and 'CLOCKTIME' not in current:
if 'IF' in current:
node = self.wn.get_node(node_name)
def _write_energy(self, f, wn):
f.write('[ENERGY]\n'.encode('ascii'))
if True: #wn.energy is not None:
if wn.options.energy.global_price is not None:
f.write('GLOBAL PRICE {:.4f}\n'.format(to_si(self.flow_units, wn.options.energy.global_price, HydParam.Energy)).encode('ascii'))
if wn.options.energy.global_pattern is not None:
f.write('GLOBAL PATTERN {:s}\n'.format(wn.options.energy.global_pattern).encode('ascii'))
if wn.options.energy.global_efficiency is not None:
f.write('GLOBAL EFFIC {:.4f}\n'.format(wn.options.energy.global_efficiency).encode('ascii'))
if wn.options.energy.demand_charge is not None:
f.write('DEMAND CHARGE {:.4f}\n'.format(wn.options.energy.demand_charge).encode('ascii'))
lnames = list(wn._pumps.keys())
lnames.sort()
for pump_name in lnames:
pump = wn._pumps[pump_name]
if pump.efficiency is not None:
f.write('PUMP {:10s} EFFIC {:s}\n'.format(pump_name, pump.efficiency.name).encode('ascii'))
if pump.energy_price is not None:
f.write('PUMP {:10s} PRICE {:s}\n'.format(pump_name, to_si(self.flow_units, pump.energy_price, HydParam.Energy)).encode('ascii'))
if pump.energy_pattern is not None:
f.write('PUMP {:10s} PATTERN {:s}\n'.format(pump_name, pump.energy_pattern).encode('ascii'))
valve_set = float(current[5])
elif valve_type == 'GPV':
curve_name = current[5]
curve_points = []
for point in self.curves[curve_name]:
x = to_si(self.flow_units, point[0], HydParam.Flow)
y = to_si(self.flow_units, point[1], HydParam.HeadLoss)
curve_points.append((x, y))
self.wn.add_curve(curve_name, 'HEADLOSS', curve_points)
valve_set = curve_name
else:
raise RuntimeError('VALVE type "%s" unrecognized' % valve_type)
self.wn.add_valve(current[0],
current[1],
current[2],
to_si(self.flow_units, float(current[3]), HydParam.PipeDiameter),
current[4].upper(),
float(current[6]),
valve_set)
if attr.lower() in ['demand']:
value = to_si(self.inp_units, value, HydParam.Demand)
elif attr.lower() in ['head']:
value = to_si(self.inp_units, value, HydParam.HydraulicHead)
elif attr.lower() in ['level']:
value = to_si(self.inp_units, value, HydParam.HydraulicHead)
elif attr.lower() in ['flow']:
value = to_si(self.inp_units, value, HydParam.Flow)
elif attr.lower() in ['pressure']:
value = to_si(self.inp_units, value, HydParam.Pressure)
elif attr.lower() in ['setting']:
link = model.get_link(words[2])
if link.valve_type.upper() in ['PRV', 'PBV', 'PSV']:
value = to_si(self.inp_units, value, HydParam.Pressure)
elif link.valve_type.upper() in ['FCV']:
value = to_si(self.inp_units, value, HydParam.Flow)
if words[1].upper() in ['NODE', 'JUNCTION', 'RESERVOIR', 'TANK']:
condition = ValueCondition(model.get_node(words[2]), words[3].lower(), words[4].lower(), value)
elif words[1].upper() in ['LINK', 'PIPE', 'PUMP', 'VALVE']:
condition = ValueCondition(model.get_link(words[2]), words[3].lower(), words[4].lower(), value)
else:
### FIXME: raise error
pass
if words[0].upper() == 'IF':
condition_list.append(condition)
elif words[0].upper() == 'AND':
condition_list.append(condition)
elif words[0].upper() == 'OR':
if len(condition_list) > 0:
other = condition_list[-1]
condition_list.remove(other)
else:
if current[7].upper() == 'CV':
self.wn.add_pipe(current[0],
current[1],
current[2],
to_si(self.flow_units, float(current[3]), HydParam.Length),
to_si(self.flow_units, float(current[4]), HydParam.PipeDiameter),
float(current[5]),
float(current[6]),
LinkStatus.Open,
True)
else:
self.wn.add_pipe(current[0],
current[1],
current[2],
to_si(self.flow_units, float(current[3]), HydParam.Length),
to_si(self.flow_units, float(current[4]), HydParam.PipeDiameter),
float(current[5]),
float(current[6]),
LinkStatus[current[7].upper()])
continue
# assert len(current) == 3, ('INP file option in [REACTIONS] block '
# 'not recognized: ' + line)
key1 = current[0].upper()
key2 = current[1].upper()
val3 = float(current[2])
if key1 == 'ORDER':
if key2 == 'BULK':
self.wn.options.quality.bulk_rxn_order = int(float(current[2]))
elif key2 == 'WALL':
self.wn.options.quality.wall_rxn_order = int(float(current[2]))
elif key2 == 'TANK':
self.wn.options.quality.tank_rxn_order = int(float(current[2]))
elif key1 == 'GLOBAL':
if key2 == 'BULK':
self.wn.options.quality.bulk_rxn_coeff = to_si(self.flow_units, val3, BulkReactionCoeff,
mass_units=self.mass_units,
reaction_order=self.wn.options.quality.bulk_rxn_order)
elif key2 == 'WALL':
self.wn.options.quality.wall_rxn_coeff = to_si(self.flow_units, val3, WallReactionCoeff,
mass_units=self.mass_units,
reaction_order=self.wn.options.quality.wall_rxn_order)
elif key1 == 'BULK':
pipe = self.wn.get_link(current[1])
pipe.bulk_rxn_coeff = to_si(self.flow_units, val3, BulkReactionCoeff,
mass_units=self.mass_units,
reaction_order=self.wn.options.quality.bulk_rxn_order)
elif key1 == 'WALL':
pipe = self.wn.get_link(current[1])
pipe.wall_rxn_coeff = to_si(self.flow_units, val3, WallReactionCoeff,
mass_units=self.mass_units,
reaction_order=self.wn.options.quality.wall_rxn_order)
def create_curve(curve_name):
curve_points = []
if curve_name not in self.wn.curve_name_list:
for point in self.curves[curve_name]:
x = to_si(self.flow_units, point[0], HydParam.Flow)
y = to_si(self.flow_units, point[1], HydParam.HydraulicHead)
curve_points.append((x,y))
self.wn.add_curve(curve_name, 'HEAD', curve_points)
curve = self.wn.get_curve(curve_name)
return curve
for point in self.curves[curve_name]:
x = to_si(self.flow_units, point[0], HydParam.Length)
y = to_si(self.flow_units, point[1], HydParam.Volume)
curve_points.append((x, y))
self.wn.add_curve(curve_name, 'VOLUME', curve_points)
curve = self.wn.get_curve(curve_name)
elif len(current) == 7:
curve = None
else:
raise RuntimeError('Tank entry format not recognized.')
self.wn.add_tank(current[0],
to_si(self.flow_units, float(current[1]), HydParam.Elevation),
to_si(self.flow_units, float(current[2]), HydParam.Length),
to_si(self.flow_units, float(current[3]), HydParam.Length),
to_si(self.flow_units, float(current[4]), HydParam.Length),
to_si(self.flow_units, float(current[5]), HydParam.TankDiameter),
to_si(self.flow_units, float(current[6]), HydParam.Volume),
curve)