Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if nodetype == pyepanet.EN_JUNCTION:
base_demand = enData.ENgetnodevalue(i+1, pyepanet.EN_BASEDEMAND)
else:
base_demand = np.nan
if nodetype == pyepanet.EN_TANK:
tank_diameter = enData.ENgetnodevalue(i+1, pyepanet.EN_TANKDIAM)
tank_minlevel = enData.ENgetnodevalue(i+1, pyepanet.EN_MINLEVEL)
tank_maxlevel = enData.ENgetnodevalue(i+1, pyepanet.EN_MAXLEVEL)
else:
tank_diameter = np.nan
tank_minlevel = np.nan
tank_maxlevel = np.nan
if convert_units:
elevation = convert('Elevation', G.graph['flowunits'], elevation) # m
base_demand = convert('Demand', G.graph['flowunits'], base_demand) # m
tank_diameter = convert('Tank Diameter', G.graph['flowunits'], tank_diameter) # m
tank_minlevel = convert('Elevation', G.graph['flowunits'], tank_minlevel) # m
tank_maxlevel = convert('Elevation', G.graph['flowunits'], tank_maxlevel) # m
if nodetype == pyepanet.EN_TANK:
G.add_node(nodeid, nodetype=nodetype, elevation=elevation,
tank_diameter=tank_diameter, tank_minlevel=tank_minlevel,
tank_maxlevel=tank_maxlevel, base_demand=base_demand)
else: G.add_node(nodeid, nodetype=nodetype, elevation=elevation, base_demand=base_demand)
nLinks = enData.ENgetcount(pyepanet.EN_LINKCOUNT)
for i in range(nLinks):
linkid = enData.ENgetlinkid(i+1)
linktype = enData.ENgetlinktype(i+1)
if not isinstance(WQ,list):
qlist = [WQ]
else:
qlist = WQ
for WQ in qlist:
node_dictonary['quality'] = []
if WQ.quality_type == 'CHEM':
# Set quality type and convert source qual
if WQ.source_type == 'MASS':
enData.ENsetqualtype(pyepanet.EN_CHEM, 'Chemical', 'mg/min', '')
wq_sourceQual = WQ.source_quality*60*1e6 # kg/s to mg/min
else:
enData.ENsetqualtype(pyepanet.EN_CHEM, 'Chemical', 'mg/L', '')
wq_sourceQual = convert('Concentration', flowunits, WQ.source_quality, MKS=False) # kg/m3 to mg/L
# Set source quality
for node in WQ.nodes:
nodeid = enData.ENgetnodeindex(node)
enData.ENsetnodevalue(nodeid, pyepanet.EN_SOURCEQUAL, wq_sourceQual)
# Set source type
if WQ.source_type == 'CONCEN':
wq_sourceType = pyepanet.EN_CONCEN
elif WQ.source_type == 'MASS':
wq_sourceType = pyepanet.EN_MASS
elif WQ.source_type == 'FLOWPACED':
wq_sourceType = pyepanet.EN_FLOWPACED
elif WQ.source_type == 'SETPOINT':
wq_sourceType = pyepanet.EN_SETPOINT
else:
t = enData.ENrunH()
end_solve_step = time.time()
self.solve_step[t/self._wn.options.hydraulic_timestep] = end_solve_step - start_solve_step
if t in results.time:
for name in node_names:
nodeindex = enData.ENgetnodeindex(name)
head = enData.ENgetnodevalue(nodeindex, pyepanet.EN_HEAD)
demand = enData.ENgetnodevalue(nodeindex, pyepanet.EN_DEMAND)
expected_demand = demand
pressure = enData.ENgetnodevalue(nodeindex, pyepanet.EN_PRESSURE)
if convert_units: # expected demand is already converted
head = convert('Hydraulic Head', flowunits, head) # m
demand = convert('Demand', flowunits, demand) # m3/s
expected_demand = convert('Demand', flowunits, expected_demand) # m3/s
pressure = convert('Pressure', flowunits, pressure) # Pa
node_dictonary['demand'].append(demand)
node_dictonary['expected_demand'].append(expected_demand)
node_dictonary['head'].append(head)
node_dictonary['pressure'].append(pressure)
node_dictonary['type'].append(self._get_node_type(name))
for name in link_names:
linkindex = enData.ENgetlinkindex(name)
flow = enData.ENgetlinkvalue(linkindex, pyepanet.EN_FLOW)
velocity = enData.ENgetlinkvalue(linkindex, pyepanet.EN_VELOCITY)
if convert_units:
flow = convert('Flow', flowunits, flow) # m3/s
velocity = convert('Velocity', flowunits, velocity) # m/s
nLinks = enData.ENgetcount(pyepanet.EN_LINKCOUNT)
for i in range(nLinks):
linkid = enData.ENgetlinkid(i+1)
linktype = enData.ENgetlinktype(i+1)
linknodes_index = enData.ENgetlinknodes(i+1)
node1 = enData.ENgetnodeid(linknodes_index[0])
node2 = enData.ENgetnodeid(linknodes_index[1])
length = enData.ENgetlinkvalue(i+1, pyepanet.EN_LENGTH)
diameter = enData.ENgetlinkvalue(i+1, pyepanet.EN_DIAMETER)
if convert_units:
length = convert('Length', G.graph['flowunits'], length) # m
diameter = convert('Pipe Diameter', G.graph['flowunits'], diameter) # m
if edge_attribute == None:
G.add_edge(node1, node2, key=linkid, linktype=linktype,
length=length, diameter=diameter)
else:
try:
data = edge_attribute[(node1, node2, linkid)]
except:
data = 0
if data > 0:
G.add_edge(node1, node2, key=linkid, linktype=linktype,
length=length, diameter=diameter, weight=abs(data))
elif data < 0:
G.add_edge(node2, node1, key=linkid, linktype=linktype,
length=length, diameter=diameter, weight=abs(data))
text_format = '{:10s} {:15f} {:15f} {:15f} {:15f} {:15f} {:15f} {:>10s} {:>3s}\n'
label_format = '{:10s} {:>15s} {:>15s} {:>15s} {:>15s} {:>15s} {:>15s} {:>10s}\n'
f.write(label_format.format(';ID', 'Elevation', 'Initial Level', 'Minimum Level', 'Maximum Level', 'Diameter', 'Minimum Volume', 'Volume Curve'))
for tank_name, tank in self.tanks():
if tank.vol_curve is not None:
f.write(text_format.format(tank_name, convert('Elevation',flowunit,tank.elevation,False), convert('Hydraulic Head',flowunit,tank.init_level,False), convert('Hydraulic Head',flowunit,tank.min_level,False), convert('Hydraulic Head',flowunit,tank.max_level,False), convert('Tank Diameter',flowunit,tank.diameter,False), convert('Volume',flowunit,tank.min_vol,False), tank.vol_curve, ';'))
else:
f.write(text_format.format(tank_name, convert('Elevation',flowunit,tank.elevation,False), convert('Hydraulic Head',flowunit,tank.init_level,False), convert('Hydraulic Head',flowunit,tank.min_level,False), convert('Hydraulic Head',flowunit,tank.max_level,False), convert('Tank Diameter',flowunit,tank.diameter,False), convert('Volume',flowunit,tank.min_vol,False), '', ';'))
# Print pipe information
f.write('[PIPES]\n')
text_format = '{:10s} {:10s} {:10s} {:15f} {:15f} {:15f} {:15f} {:>10s} {:>3s}\n'
label_format = '{:10s} {:>10s} {:>10s} {:>15s} {:>15s} {:>15s} {:>15s} {:>10s}\n'
f.write(label_format.format(';ID', 'Node1', 'Node2', 'Length', 'Diameter', 'Roughness', 'Minor Loss', 'Status'))
for pipe_name, pipe in self.pipes():
f.write(text_format.format(pipe_name, pipe.start_node(), pipe.end_node(), convert('Length',flowunit,pipe.length,False), convert('Pipe Diameter',flowunit,pipe.diameter,False), pipe.roughness, pipe.minor_loss, LinkStatus.status_to_str(pipe.get_base_status()), ';'))
# Print pump information
print >> f, '[PUMPS]'
text_format = '{:10s} {:10s} {:10s} {:10s} {:10s} {:>3s}'
label_format = '{:10s} {:>10s} {:>10s} {:>10s}'
print >> f, label_format.format(';ID', 'Node1', 'Node2', 'Parameters')
for pump_name, pump in self.links(Pump):
if pump.info_type == 'HEAD':
print >> f, text_format.format(pump_name, pump.start_node(), pump.end_node(), pump.info_type, pump.curve.name, ';')
elif pump.info_type == 'POWER':
print >> f, text_format.format(pump_name, pump.start_node(), pump.end_node(), pump.info_type, str(pump.power/1000.0), ';')
else:
raise RuntimeError('Only head or power info is supported of pumps.')
# Print valve information
print >> f, '[VALVES]'
text_format = '{:10s} {:10s} {:10s} {:10f} {:10s} {:10f} {:10f} {:>3s}'
node_dictonary['demand'].append(demand)
node_dictonary['expected_demand'].append(expected_demand)
node_dictonary['head'].append(head)
node_dictonary['pressure'].append(pressure)
node_dictonary['type'].append(self._get_node_type(name))
for name in link_names:
linkindex = enData.ENgetlinkindex(name)
flow = enData.ENgetlinkvalue(linkindex, pyepanet.EN_FLOW)
velocity = enData.ENgetlinkvalue(linkindex, pyepanet.EN_VELOCITY)
if convert_units:
flow = convert('Flow', flowunits, flow) # m3/s
velocity = convert('Velocity', flowunits, velocity) # m/s
link_dictonary['flowrate'].append(flow)
link_dictonary['velocity'].append(velocity)
link_dictonary['type'].append(self._get_link_type(name))
tstep = enData.ENnextH()
if tstep <= 0:
break
if enData.Warnflag:
results.error_code = 1
if enData.Errflag:
results.error_code = 2
enData.ENcloseH()
self.warning_list = enData.errcodelist
label_format = '{:10s} {:>15s} {:>10s}\n'
f.write(label_format.format(';ID', 'Head', 'Pattern'))
for reservoir_name, reservoir in self.reservoirs():
if reservoir.head_pattern_name is not None:
f.write(text_format.format(reservoir_name, convert('Hydraulic Head',flowunit,reservoir.base_head,False), reservoir.head_pattern_name, ';'))
else:
f.write(text_format.format(reservoir_name, convert('Hydraulic Head',flowunit,reservoir.base_head,False), '', ';'))
# Print tank information
f.write('[TANKS]\n')
text_format = '{:10s} {:15f} {:15f} {:15f} {:15f} {:15f} {:15f} {:>10s} {:>3s}\n'
label_format = '{:10s} {:>15s} {:>15s} {:>15s} {:>15s} {:>15s} {:>15s} {:>10s}\n'
f.write(label_format.format(';ID', 'Elevation', 'Initial Level', 'Minimum Level', 'Maximum Level', 'Diameter', 'Minimum Volume', 'Volume Curve'))
for tank_name, tank in self.tanks():
if tank.vol_curve is not None:
f.write(text_format.format(tank_name, convert('Elevation',flowunit,tank.elevation,False), convert('Hydraulic Head',flowunit,tank.init_level,False), convert('Hydraulic Head',flowunit,tank.min_level,False), convert('Hydraulic Head',flowunit,tank.max_level,False), convert('Tank Diameter',flowunit,tank.diameter,False), convert('Volume',flowunit,tank.min_vol,False), tank.vol_curve, ';'))
else:
f.write(text_format.format(tank_name, convert('Elevation',flowunit,tank.elevation,False), convert('Hydraulic Head',flowunit,tank.init_level,False), convert('Hydraulic Head',flowunit,tank.min_level,False), convert('Hydraulic Head',flowunit,tank.max_level,False), convert('Tank Diameter',flowunit,tank.diameter,False), convert('Volume',flowunit,tank.min_vol,False), '', ';'))
# Print pipe information
f.write('[PIPES]\n')
text_format = '{:10s} {:10s} {:10s} {:15f} {:15f} {:15f} {:15f} {:>10s} {:>3s}\n'
label_format = '{:10s} {:>10s} {:>10s} {:>15s} {:>15s} {:>15s} {:>15s} {:>10s}\n'
f.write(label_format.format(';ID', 'Node1', 'Node2', 'Length', 'Diameter', 'Roughness', 'Minor Loss', 'Status'))
for pipe_name, pipe in self.pipes():
f.write(text_format.format(pipe_name, pipe.start_node(), pipe.end_node(), convert('Length',flowunit,pipe.length,False), convert('Pipe Diameter',flowunit,pipe.diameter,False), pipe.roughness, pipe.minor_loss, LinkStatus.status_to_str(pipe.get_base_status()), ';'))
# Print pump information
print >> f, '[PUMPS]'
text_format = '{:10s} {:10s} {:10s} {:10s} {:10s} {:>3s}'
label_format = '{:10s} {:>10s} {:>10s} {:>10s}'
print >> f, label_format.format(';ID', 'Node1', 'Node2', 'Parameters')
start_solve_step = time.time()
t = enData.ENrunH()
end_solve_step = time.time()
self.solve_step[t/self._wn.options.hydraulic_timestep] = end_solve_step - start_solve_step
if t in results.time:
for name in node_names:
nodeindex = enData.ENgetnodeindex(name)
head = enData.ENgetnodevalue(nodeindex, pyepanet.EN_HEAD)
demand = enData.ENgetnodevalue(nodeindex, pyepanet.EN_DEMAND)
expected_demand = demand
pressure = enData.ENgetnodevalue(nodeindex, pyepanet.EN_PRESSURE)
if convert_units: # expected demand is already converted
head = convert('Hydraulic Head', flowunits, head) # m
demand = convert('Demand', flowunits, demand) # m3/s
expected_demand = convert('Demand', flowunits, expected_demand) # m3/s
pressure = convert('Pressure', flowunits, pressure) # Pa
node_dictonary['demand'].append(demand)
node_dictonary['expected_demand'].append(expected_demand)
node_dictonary['head'].append(head)
node_dictonary['pressure'].append(pressure)
node_dictonary['type'].append(self._get_node_type(name))
for name in link_names:
linkindex = enData.ENgetlinkindex(name)
flow = enData.ENgetlinkvalue(linkindex, pyepanet.EN_FLOW)
velocity = enData.ENgetlinkvalue(linkindex, pyepanet.EN_VELOCITY)
if convert_units:
flow = convert('Flow', flowunits, flow) # m3/s
base_demand = np.nan
if nodetype == pyepanet.EN_TANK:
tank_diameter = enData.ENgetnodevalue(i+1, pyepanet.EN_TANKDIAM)
tank_minlevel = enData.ENgetnodevalue(i+1, pyepanet.EN_MINLEVEL)
tank_maxlevel = enData.ENgetnodevalue(i+1, pyepanet.EN_MAXLEVEL)
else:
tank_diameter = np.nan
tank_minlevel = np.nan
tank_maxlevel = np.nan
if convert_units:
elevation = convert('Elevation', G.graph['flowunits'], elevation) # m
base_demand = convert('Demand', G.graph['flowunits'], base_demand) # m
tank_diameter = convert('Tank Diameter', G.graph['flowunits'], tank_diameter) # m
tank_minlevel = convert('Elevation', G.graph['flowunits'], tank_minlevel) # m
tank_maxlevel = convert('Elevation', G.graph['flowunits'], tank_maxlevel) # m
if nodetype == pyepanet.EN_TANK:
G.add_node(nodeid, nodetype=nodetype, elevation=elevation,
tank_diameter=tank_diameter, tank_minlevel=tank_minlevel,
tank_maxlevel=tank_maxlevel, base_demand=base_demand)
else: G.add_node(nodeid, nodetype=nodetype, elevation=elevation, base_demand=base_demand)
nLinks = enData.ENgetcount(pyepanet.EN_LINKCOUNT)
for i in range(nLinks):
linkid = enData.ENgetlinkid(i+1)
linktype = enData.ENgetlinktype(i+1)
linknodes_index = enData.ENgetlinknodes(i+1)
node1 = enData.ENgetnodeid(linknodes_index[0])
node2 = enData.ENgetnodeid(linknodes_index[1])