Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
current = line.split()
if current == []:
continue
link_name = current[1]
link = self.wn.get_link(link_name)
if current[5].upper() != 'TIME' and current[5].upper() != 'CLOCKTIME':
node_name = current[5]
current = [i.upper() for i in current]
current[1] = link_name # don't capitalize the link name
# Create the control action object
status = current[2].upper()
if status == 'OPEN' or status == 'OPENED' or status == 'CLOSED' or status == 'ACTIVE':
setting = LinkStatus[status].value
action_obj = wntr.network.ControlAction(link, 'status', setting)
else:
if isinstance(link, wntr.network.Pump):
action_obj = wntr.network.ControlAction(link, 'speed', float(current[2]))
elif isinstance(link, wntr.network.Valve):
if link.valve_type == 'PRV' or link.valve_type == 'PSV' or link.valve_type == 'PBV':
setting = to_si(self.flow_units, float(current[2]), HydParam.Pressure)
elif link.valve_type == 'FCV':
setting = to_si(self.flow_units, float(current[2]), HydParam.Flow)
elif link.valve_type == 'TCV':
setting = float(current[2])
elif link.valve_type == 'GPV':
setting = current[2]
else:
raise ValueError('Unrecognized valve type {0} while parsing control {1}'.format(link.valve_type, line))
action_obj = wntr.network.ControlAction(link, 'setting', setting)
else:
#print (link_name in wn._links.keys())
link = wn.get_link(link_name)
if type(current[2]) == str:
status = wntr.network.LinkStatus.str_to_status(current[2])
action_obj = wntr.network.ControlAction(link, 'status', status)
elif type(current[2]) == float or type(current[2]) == int:
if isinstance(link, wntr.network.Pump):
logger.warning('Currently, pump speed settings are only supported in the EpanetSimulator.')
continue
elif isinstance(link, wntr.network.Valve):
if link.valve_type != 'PRV':
logger.warning('Currently, valves of type '+link.valve_type+' are only supported in the EpanetSimulator.')
continue
else:
status = convert('Pressure', inp_units, float(current[2]))
action_obj = wntr.network.ControlAction(link, 'setting', status)
# Create the control object
if 'TIME' not in current and 'CLOCKTIME' not in current:
current[5] = current_copy[5]
if 'IF' in current:
node_name = current[5]
node = wn.get_node(node_name)
if current[6]=='ABOVE':
oper = np.greater
elif current[6]=='BELOW':
oper = np.less
else:
raise RuntimeError("The following control is not recognized: " + line)
### OKAY - we are adding in the elevation. This is A PROBLEM IN THE INP WRITER. Now that we know, we can fix it, but if this changes, it will affect multiple pieces, just an FYI.
if isinstance(node, wntr.network.Junction):
threshold = convert('Pressure',inp_units,float(current[7]))+node.elevation
action_obj = wntr.network.ControlAction(link, 'status', setting)
else:
if isinstance(link, wntr.network.Pump):
action_obj = wntr.network.ControlAction(link, 'speed', float(current[2]))
elif isinstance(link, wntr.network.Valve):
if link.valve_type == 'PRV' or link.valve_type == 'PSV' or link.valve_type == 'PBV':
setting = to_si(self.flow_units, float(current[2]), HydParam.Pressure)
elif link.valve_type == 'FCV':
setting = to_si(self.flow_units, float(current[2]), HydParam.Flow)
elif link.valve_type == 'TCV':
setting = float(current[2])
elif link.valve_type == 'GPV':
setting = current[2]
else:
raise ValueError('Unrecognized valve type {0} while parsing control {1}'.format(link.valve_type, line))
action_obj = wntr.network.ControlAction(link, 'setting', setting)
else:
raise RuntimeError(('Links of type {0} can only have controls that change\n'.format(type(link))+
'the link status. Control: {0}'.format(line)))
# Create the control object
control_name = ''
if 'TIME' not in current and 'CLOCKTIME' not in current:
if 'IF' in current:
node = self.wn.get_node(node_name)
if current[6] == 'ABOVE':
oper = np.greater
elif current[6] == 'BELOW':
oper = np.less
else:
raise RuntimeError("The following control is not recognized: " + line)
# OKAY - we are adding in the elevation. This is A PROBLEM
#control._priority = 2
#self.add_control(control)
# Now take care of the max level
max_head = tank.max_level+tank.elevation
for link_name in all_links:
link = self.get_link(link_name)
if isinstance(link, Pipe):
if link.cv:
if link.start_node()==tank_name:
continue
if isinstance(link, Pump):
if link.start_node()==tank_name:
continue
close_control_action = wntr.network.ControlAction(link, 'status', LinkStatus.closed)
open_control_action = wntr.network.ControlAction(link, 'status', LinkStatus.opened)
control = wntr.network.MultiConditionalControl([(tank,'head'),(tank,'prev_head')],[np.less,np.greater_equal],[max_head-self._Htol,max_head-self._Htol],open_control_action)
control._partial_step_for_tanks = False
control._priority = 0
control.name = tank.name()+link_name+'opened because head is less than max head'
tank_controls.append(control)
control = wntr.network.ConditionalControl((tank,'head'),np.greater_equal,max_head,close_control_action)
control._priority = 1
control.name = tank.name()+link_name+' closed because head is greater than max head'
tank_controls.append(control)
if link.start_node() == tank_name:
other_node_name = link.end_node()
else:
is None, it is assumed that an external control will be
used to end the leak (otherwise, the leak will not end).
"""
self._leak = True
self.leak_area = area
self.leak_discharge_coeff = discharge_coeff
if start_time is not None:
start_control_action = wntr.network.ControlAction(self, 'leak_status', True)
control = wntr.network.TimeControl(wn, start_time, 'SIM_TIME', False, start_control_action)
wn.add_control(self._leak_start_control_name, control)
if end_time is not None:
end_control_action = wntr.network.ControlAction(self, 'leak_status', False)
control = wntr.network.TimeControl(wn, end_time, 'SIM_TIME', False, end_control_action)
wn.add_control(self._leak_end_control_name, control)
def add_pump_outage(self, pump_name, start_time, end_time):
pump = self.get_link(pump_name)
opened_action_obj = wntr.network.ControlAction(pump, 'status', LinkStatus.opened)
closed_action_obj = wntr.network.ControlAction(pump, 'status', LinkStatus.closed)
control = wntr.network.TimeControl(self, end_time, 'SIM_TIME', False, opened_action_obj)
control._priority = 0
self.add_control(pump_name+'PowerOn'+str(end_time),control)
control = wntr.network.TimeControl(self, start_time, 'SIM_TIME', False, closed_action_obj)
control._priority = 3
self.add_control(pump_name+'PowerOff'+str(start_time),control)
make sure all user-defined controls for ending the leak have
been removed before using this method (see
WaterNetworkModel.remove_leak() or
WaterNetworkModel.discard_leak()).
Parameters
----------
wn: WaterNetworkModel object
t: int
end time in seconds
"""
# remove old control
wn.discard_control(self._leak_end_control_name)
# add new control
end_control_action = wntr.network.ControlAction(self, 'leak_status', False)
control = wntr.network.TimeControl(wn, t, 'SIM_TIME', False, end_control_action)
wn.add_control(self._leak_end_control_name, control)
def _get_valve_controls(self):
valve_controls = []
for valve_name, valve in self.valves():
close_control_action = wntr.network.ControlAction(valve, '_status', LinkStatus.closed)
open_control_action = wntr.network.ControlAction(valve, '_status', LinkStatus.opened)
active_control_action = wntr.network.ControlAction(valve, '_status', LinkStatus.active)
control = wntr.network._PRVControl(self, valve, self._Htol, self._Qtol, close_control_action, open_control_action, active_control_action)
control.name = valve.name()+' prv control'
valve_controls.append(control)
return valve_controls
def _get_cv_controls(self):
cv_controls = []
for pipe_name in self._check_valves:
pipe = self.get_link(pipe_name)
close_control_action = wntr.network.ControlAction(pipe, 'status', LinkStatus.closed)
open_control_action = wntr.network.ControlAction(pipe, 'status', LinkStatus.opened)
control = wntr.network._CheckValveHeadControl(self, pipe, np.greater, self._Htol, open_control_action)
control._priority = 0
control.name = pipe.name()+'opened because of cv head control'
cv_controls.append(control)
control = wntr.network._CheckValveHeadControl(self, pipe, np.less, -self._Htol, close_control_action)
control._priority = 3
control.name = pipe.name()+' closed because of cv head control'
cv_controls.append(control)
control = wntr.network.ConditionalControl((pipe,'flow'),np.less, -self._Qtol, close_control_action)
control._priority = 3
control.name = pipe.name()+' closed because negative flow in cv'
cv_controls.append(control)