Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_pump_controls(self):
pump_controls = []
for pump_name, pump in self.links(Pump):
close_control_action = ControlAction(pump, '_cv_status', LinkStatus.closed)
open_control_action = ControlAction(pump, '_cv_status', LinkStatus.opened)
control = _CheckValveHeadControl(self, pump, np.greater, self._Htol, open_control_action)
control._priority = 0
control.name = pump.name+' opened because of cv head control'
pump_controls.append(control)
control = _CheckValveHeadControl(self, pump, np.less, -self._Htol, close_control_action)
control._priority = 3
control.name = pump.name+' closed because of cv head control'
pump_controls.append(control)
control = ConditionalControl((pump,'flow'),np.less, -self._Qtol, close_control_action)
control._priority = 3
control.name = pump.name+' closed because negative flow in pump'
pump_controls.append(control)
def _get_cv_controls(self):
cv_controls = []
for pipe_name in self._check_valves:
pipe = self.get_link(pipe_name)
close_control_action = ControlAction(pipe, 'status', LinkStatus.closed)
open_control_action = ControlAction(pipe, 'status', LinkStatus.opened)
control = _CheckValveHeadControl(self, pipe, np.greater, self._Htol, open_control_action)
control._priority = 0
control.name = pipe.name+'opened because of cv head control'
cv_controls.append(control)
control = _CheckValveHeadControl(self, pipe, np.less, -self._Htol, close_control_action)
control._priority = 3
control.name = pipe.name+' closed because of cv head control'
cv_controls.append(control)
control = ConditionalControl((pipe,'flow'),np.less, -self._Qtol, close_control_action)
control._priority = 3
control.name = pipe.name+' closed because negative flow in cv'
cv_controls.append(control)
pump = self.get_link(pump_name)
end_power_outage_action = ControlAction(pump, '_power_outage', False)
start_power_outage_action = ControlAction(pump, '_power_outage', True)
control = TimeControl(self, end_time, 'SIM_TIME', False, end_power_outage_action)
control._priority = 0
self.add_control(pump_name+'PowerOn'+str(end_time),control)
control = TimeControl(self, start_time, 'SIM_TIME', False, start_power_outage_action)
control._priority = 3
self.add_control(pump_name+'PowerOff'+str(start_time),control)
opened_action_obj = ControlAction(pump, 'status', LinkStatus.opened)
closed_action_obj = ControlAction(pump, 'status', LinkStatus.closed)
control = _MultiConditionalControl([(pump,'_power_outage')],[np.equal],[True], closed_action_obj)
control._priority = 3
self.add_control(pump_name+'PowerOffStatus'+str(end_time),control)
control = _MultiConditionalControl([(pump,'_prev_power_outage'),(pump,'_power_outage')],[np.equal,np.equal],[True,False],opened_action_obj)
control._priority = 0
self.add_control(pump_name+'PowerOnStatus'+str(start_time),control)
def _get_pump_controls(self):
pump_controls = []
for control_name, control in self.controls():
for action in control.actions():
target_obj, target_attr = action.target()
if target_attr == 'base_speed':
if not isinstance(target_obj, Pump):
raise ValueError('base_speed can only be changed on pumps; ' + str(control))
new_status = LinkStatus.Open
new_action = ControlAction(target_obj, 'status', new_status)
condition = control.condition
new_control = type(control)(condition, new_action, priority=control.priority)
pump_controls.append(new_control)
for pump_name, pump in self.pumps():
close_control_action = _InternalControlAction(pump, '_internal_status', LinkStatus.Closed, 'status')
open_control_action = _InternalControlAction(pump, '_internal_status', LinkStatus.Open, 'status')
if pump.pump_type == 'HEAD':
close_condition = _CloseHeadPumpCondition(self, pump)
open_condition = _OpenHeadPumpCondition(self, pump)
elif pump.pump_type == 'POWER':
close_condition = _ClosePowerPumpCondition(self, pump)
open_condition = _OpenPowerPumpCondition(self, pump)
else:
raise ValueError('Unrecognized pump pump_type: {0}'.format(pump.pump_type))
for link_name in all_links:
link = self.get_link(link_name)
link_has_cv = False
if isinstance(link, Pipe):
if link.cv:
if link.start_node==tank_name:
continue
else:
link_has_cv = True
if isinstance(link, Pump):
if link.start_node==tank_name:
continue
else:
link_has_cv = True
close_control_action = ControlAction(link, 'status', LinkStatus.closed)
open_control_action = ControlAction(link, 'status', LinkStatus.opened)
control = ConditionalControl((tank,'head'),np.greater_equal,max_head,close_control_action)
control._priority = 1
control.name = link_name+' closed because tank '+tank.name+' head is greater than max head'
tank_controls.append(control)
if not link_has_cv:
control = _MultiConditionalControl([(tank,'head'),(tank,'_prev_head'),(self,'sim_time')],[np.less,np.greater_equal,np.greater],[max_head-self._Htol,max_head-self._Htol,0.0],open_control_action)
control._partial_step_for_tanks = False
control._priority = 0
control.name = link_name+'opened because tank '+tank.name+' head is less than max head'
tank_controls.append(control)
if link.start_node == tank_name:
other_node_name = link.end_node
def __init__(self, target_obj, attribute, value):
super(ControlAction, self).__init__()
if target_obj is None:
raise ValueError('target_obj is None in ControlAction::__init__. A valid target_obj is needed.')
if not hasattr(target_obj, attribute):
raise ValueError('attribute given in ControlAction::__init__ is not valid for target_obj')
self._target_obj = target_obj
self._attribute = attribute
self._value = value
control = _ValveNewSettingControl(self, valve)
control.name = valve.name + ' new setting for valve control'
valve_controls.append(control)
if valve.valve_type == 'PRV':
close_control_action = ControlAction(valve, '_status', LinkStatus.Closed)
open_control_action = ControlAction(valve, '_status', LinkStatus.Opened)
active_control_action = ControlAction(valve, '_status', LinkStatus.Active)
control = _PRVControl(self, valve, self._Htol, self._Qtol, close_control_action, open_control_action, active_control_action)
control.name = valve.name+' prv control'
valve_controls.append(control)
elif valve.valve_type == 'FCV':
open_control_action = ControlAction(valve, '_status', LinkStatus.Opened)
active_control_action = ControlAction(valve, '_status', LinkStatus.Active)
control = _FCVControl(self, valve, self._Htol, open_control_action,
active_control_action)
control.name = valve.name + ' FCV control'
valve_controls.append(control)
return valve_controls
def _get_valve_controls(self):
valve_controls = []
for valve_name, valve in self.links(Valve):
control = _ValveNewSettingControl(self, valve)
control.name = valve.name + ' new setting for valve control'
valve_controls.append(control)
if valve.valve_type == 'PRV':
close_control_action = ControlAction(valve, '_status', LinkStatus.Closed)
open_control_action = ControlAction(valve, '_status', LinkStatus.Opened)
active_control_action = ControlAction(valve, '_status', LinkStatus.Active)
control = _PRVControl(self, valve, self._Htol, self._Qtol, close_control_action, open_control_action, active_control_action)
control.name = valve.name+' prv control'
valve_controls.append(control)
elif valve.valve_type == 'FCV':
open_control_action = ControlAction(valve, '_status', LinkStatus.Opened)
active_control_action = ControlAction(valve, '_status', LinkStatus.Active)
control = _FCVControl(self, valve, self._Htol, open_control_action,
active_control_action)
control.name = valve.name + ' FCV control'
valve_controls.append(control)
return valve_controls
None, it is assumed that an external control will be used
to start the leak (otherwise, the leak will not start).
end_time : int
Time at which the leak is fixed in seconds. If the end_time
is None, it is assumed that an external control will be
used to end the leak (otherwise, the leak will not end).
"""
from wntr.network.controls import ControlAction, Control
self._leak = True
self.leak_area = area
self.leak_discharge_coeff = discharge_coeff
if start_time is not None:
start_control_action = ControlAction(self, 'leak_status', True)
control = Control._time_control(wn, start_time, 'SIM_TIME', False, start_control_action)
wn.add_control(self._leak_start_control_name, control)
if end_time is not None:
end_control_action = ControlAction(self, 'leak_status', False)
control = Control._time_control(wn, end_time, 'SIM_TIME', False, end_control_action)
wn.add_control(self._leak_end_control_name, control)
been removed before using this method (see
WaterNetworkModel.remove_leak() or
WaterNetworkModel.discard_leak()).
Parameters
----------
wn: wntr WaterNetworkModel
Water network model
t: int
end time in seconds
"""
# remove old control
wn._discard_control(self._leak_end_control_name)
# add new control
end_control_action = ControlAction(self, 'leak_status', False)
control = TimeControl(wn, t, 'SIM_TIME', False, end_control_action)
wn.add_control(self._leak_end_control_name, control)