Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def add_action_on_true(self, action, prefix=' THEN'):
"""Add a "then" action from an IfThenElseControl"""
if isinstance(action, ControlAction):
fmt = '{} {} {} {} = {}'
attr = action._attribute
val_si = action._repr_value()
if attr.lower() in ['demand']:
value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Demand))
elif attr.lower() in ['head', 'level']:
value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.HydraulicHead))
elif attr.lower() in ['flow']:
value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Flow))
elif attr.lower() in ['pressure']:
value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Pressure))
elif attr.lower() in ['setting']:
if isinstance(action._target_obj_ref, Valve):
if action._target_obj_ref.valve_type.upper() in ['PRV', 'PBV', 'PSV']:
value = from_si(self.inp_units, val_si, HydParam.Pressure)
elif action._target_obj_ref.valve_type.upper() in ['FCV']:
value = from_si(self.inp_units, val_si, HydParam.Flow)
else:
value = val_si
else:
value = val_si
value = '{:.6g}'.format(value)
else: # status
value = val_si
clause = fmt.format(prefix, action._target_obj_ref.__class__.__name__,
action._target_obj_ref.name, action._attribute,
value)
def _write_valves(self, f, wn):
f.write('[VALVES]\n'.encode('ascii'))
f.write(_VALVE_LABEL.format(';ID', 'Node1', 'Node2', 'Diameter', 'Type', 'Setting', 'Minor Loss').encode('ascii'))
lnames = list(wn._valves.keys())
lnames.sort()
for valve_name in lnames:
valve = wn._valves[valve_name]
E = {'name': valve_name,
'node1': valve.start_node,
'node2': valve.end_node,
'diam': from_si(self.flow_units, valve.diameter, HydParam.PipeDiameter),
'vtype': valve.valve_type,
'set': valve._base_setting,
'mloss': valve.minor_loss,
'com': ';'}
valve_type = valve.valve_type
if valve_type in ['PRV', 'PSV', 'PBV']:
valve_set = from_si(self.flow_units, valve._base_setting, HydParam.Pressure)
elif valve_type == 'FCV':
valve_set = from_si(self.flow_units, valve._base_setting, HydParam.Flow)
elif valve_type == 'TCV':
valve_set = valve._base_setting
elif valve_type == 'GPV':
valve_set = valve._base_setting
E['set'] = valve_set
f.write(_VALVE_ENTRY.format(**E).encode('ascii'))
f.write('\n'.encode('ascii'))
attr = condition._source_attr
val_si = condition._repr_value(attr, condition._threshold)
if attr.lower() in ['demand']:
value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Demand))
elif attr.lower() in ['head', 'level']:
value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.HydraulicHead))
elif attr.lower() in ['flow']:
value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Flow))
elif attr.lower() in ['pressure']:
value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Pressure))
elif attr.lower() in ['setting']:
if isinstance(condition._source_obj, Valve):
if condition._source_obj.valve_type.upper() in ['PRV', 'PBV', 'PSV']:
value = from_si(self.inp_units, val_si, HydParam.Pressure)
elif condition._source_obj.valve_type.upper() in ['FCV']:
value = from_si(self.inp_units, val_si, HydParam.Flow)
else:
value = val_si
else:
value = val_si
value = '{:.6g}'.format(value)
else: # status
value = val_si
clause = fmt.format(prefix, condition._source_obj.__class__.__name__,
condition._source_obj.name, condition._source_attr,
condition._relation.symbol, value)
self.add_if(clause)
else:
raise ValueError('Unknown ControlCondition for EPANET Rules')
from_si(self.flow_units,
tank.bulk_rxn_coeff,
QualParam.BulkReactionCoeff,
mass_units=self.mass_units,
reaction_order=wn.options.quality.bulk_rxn_order)).encode('ascii'))
for pipe_name, pipe in wn.links(Pipe):
if pipe.bulk_rxn_coeff is not None:
f.write(entry_float.format('BULK',pipe_name,
from_si(self.flow_units,
pipe.bulk_rxn_coeff,
QualParam.BulkReactionCoeff,
mass_units=self.mass_units,
reaction_order=wn.options.quality.bulk_rxn_order)).encode('ascii'))
if pipe.wall_rxn_coeff is not None:
f.write(entry_float.format('WALL',pipe_name,
from_si(self.flow_units,
pipe.wall_rxn_coeff,
QualParam.WallReactionCoeff,
mass_units=self.mass_units,
reaction_order=wn.options.quality.wall_rxn_order)).encode('ascii'))
f.write('\n'.encode('ascii'))
if current[0].upper() == 'GLOBAL':
if current[1].upper() == 'PRICE':
self.wn.options.energy.global_price = from_si(self.flow_units, float(current[2]), HydParam.Energy)
elif current[1].upper() == 'PATTERN':
self.wn.options.energy.global_pattern = current[2]
elif current[1].upper() in ['EFFIC', 'EFFICIENCY']:
self.wn.options.energy.global_efficiency = float(current[2])
else:
logger.warning('Unknown entry in ENERGY section: %s', line)
elif current[0].upper() == 'DEMAND':
self.wn.options.energy.demand_charge = float(current[2])
elif current[0].upper() == 'PUMP':
pump_name = current[1]
pump = self.wn._pumps[pump_name]
if current[2].upper() == 'PRICE':
pump.energy_price = from_si(self.flow_units, float(current[2]), HydParam.Energy)
elif current[2].upper() == 'PATTERN':
pump.energy_pattern = current[2]
elif current[2].upper() in ['EFFIC', 'EFFICIENCY']:
curve_name = current[3]
curve_points = []
for point in self.curves[curve_name]:
x = to_si(self.flow_units, point[0], HydParam.Flow)
y = point[1]
curve_points.append((x, y))
self.wn.add_curve(curve_name, 'EFFICIENCY', curve_points)
curve = self.wn.get_curve(curve_name)
pump.efficiency = curve
else:
logger.warning('Unknown entry in ENERGY section: %s', line)
else:
logger.warning('Unknown entry in ENERGY section: %s', line)
f.write(_CURVE_LABEL.format(';ID', 'X-Value', 'Y-Value').encode('ascii'))
curves = list(wn._curves.keys())
curves.sort()
for curve_name in curves:
curve = wn.get_curve(curve_name)
if curve.curve_type == 'VOLUME':
f.write(';VOLUME: {}\n'.format(curve_name).encode('ascii'))
for point in curve.points:
x = from_si(self.flow_units, point[0], HydParam.Length)
y = from_si(self.flow_units, point[1], HydParam.Volume)
f.write(_CURVE_ENTRY.format(name=curve_name, x=x, y=y, com=';').encode('ascii'))
elif curve.curve_type == 'HEAD':
f.write(';PUMP: {}\n'.format(curve_name).encode('ascii'))
for point in curve.points:
x = from_si(self.flow_units, point[0], HydParam.Flow)
y = from_si(self.flow_units, point[1], HydParam.HydraulicHead)
f.write(_CURVE_ENTRY.format(name=curve_name, x=x, y=y, com=';').encode('ascii'))
elif curve.curve_type == 'EFFICIENCY':
f.write(';EFFICIENCY: {}\n'.format(curve_name).encode('ascii'))
for point in curve.points:
x = from_si(self.flow_units, point[0], HydParam.Flow)
y = point[1]
f.write(_CURVE_ENTRY.format(name=curve_name, x=x, y=y, com=';').encode('ascii'))
elif curve.curve_type == 'HEADLOSS':
f.write(';HEADLOSS: {}\n'.format(curve_name).encode('ascii'))
for point in curve.points:
x = from_si(self.flow_units, point[0], HydParam.Flow)
y = from_si(self.flow_units, point[1], HydParam.HeadLoss)
f.write(_CURVE_ENTRY.format(name=curve_name, x=x, y=y, com=';').encode('ascii'))
else:
f.write(';UNKNOWN: {}\n'.format(curve_name).encode('ascii'))
for point in curve.points:
def _write_quality(self, f, wn):
f.write('[QUALITY]\n'.encode('ascii'))
entry = '{:10s} {:10s}\n'
label = '{:10s} {:10s}\n'
nnodes = list(wn._nodes.keys())
nnodes.sort()
for node_name in nnodes:
node = wn._nodes[node_name]
if node.initial_quality:
if wn.options.quality.mode == 'CHEMICAL':
quality = from_si(self.flow_units, node.initial_quality, QualParam.Concentration, mass_units=self.mass_units)
elif wn.options.quality.mode == 'AGE':
quality = from_si(self.flow_units, node.initial_quality, QualParam.WaterAge)
else:
quality = node.initial_quality
f.write(entry.format(node_name, str(quality)).encode('ascii'))
f.write('\n'.encode('ascii'))
def _write_tanks(self, f, wn):
f.write('[TANKS]\n'.encode('ascii'))
f.write(_TANK_LABEL.format(';ID', 'Elevation', 'Init Level', 'Min Level', 'Max Level',
'Diameter', 'Min Volume', 'Volume Curve').encode('ascii'))
nnames = list(wn._tanks.keys())
nnames.sort()
for tank_name in nnames:
tank = wn._tanks[tank_name]
E = {'name': tank_name,
'elev': from_si(self.flow_units, tank.elevation, HydParam.Elevation),
'initlev': from_si(self.flow_units, tank.init_level, HydParam.HydraulicHead),
'minlev': from_si(self.flow_units, tank.min_level, HydParam.HydraulicHead),
'maxlev': from_si(self.flow_units, tank.max_level, HydParam.HydraulicHead),
'diam': from_si(self.flow_units, tank.diameter, HydParam.TankDiameter),
'minvol': from_si(self.flow_units, tank.min_vol, HydParam.Volume),
'curve': '',
'com': ';'}
if tank.vol_curve is not None:
E['curve'] = tank.vol_curve.name
f.write(_TANK_ENTRY.format(**E).encode('ascii'))
f.write('\n'.encode('ascii'))
def get_setting(control, control_name):
value = control._control_action._value
attribute = control._control_action._attribute.lower()
if attribute == 'status':
setting = LinkStatus(value).name
elif attribute == 'speed':
setting = str(value)
elif attribute == 'setting' and isinstance(control._control_action._target_obj_ref, Valve):
valve = control._control_action._target_obj_ref
valve_type = valve.valve_type
if valve_type == 'PRV' or valve_type == 'PSV' or valve_type == 'PBV':
setting = str(from_si(self.flow_units, value, HydParam.Pressure))
elif valve_type == 'FCV':
setting = str(from_si(self.flow_units, value, HydParam.Flow))
elif valve_type == 'TCV':
setting = str(value)
elif valve_type == 'GPV':
setting = value
else:
raise ValueError('Valve type not recognized' + str(valve_type))
elif attribute == 'setting':
setting = value
else:
setting = None
logger.warning('Could not write control '+str(control_name)+' - skipping')
return setting