Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _write_rules(self, f, wn):
f.write('[RULES]\n'.encode('ascii'))
controls = list(wn._controls.keys())
controls.sort()
for text in controls:
all_control = wn.get_control(text)
entry = '{}\n'
if isinstance(all_control, wntr.network.controls.IfThenElseControl):
rule = _EpanetRule('blah', self.flow_units, self.mass_units)
rule.from_if_then_else(all_control)
f.write(entry.format(str(rule)).encode('ascii'))
f.write('\n'.encode('ascii'))
np.random.seed(12343)
# Create a water network model
inp_file = 'networks/Net3.inp'
wn = wntr.network.WaterNetworkModel(inp_file)
# Define the earthquake
wn = wntr.morph.node.scale_node_coordinates(wn, 1000)
epicenter = (32000,15000) # x,y location
magnitude = 7 # Richter scale
depth = 10000 # m, shallow depth
earthquake = wntr.scenario.Earthquake(epicenter, magnitude, depth)
# Compute PGA
R = earthquake.distance_to_epicenter(wn, element_type=wntr.network.Pipe)
pga = earthquake.pga_attenuation_model(R)
wntr.graphics.plot_network(wn, link_attribute=pga,title='Peak Ground Acceleration (g)',
node_size=0, link_width=2)
# Define fragility curve
FC = wntr.scenario.FragilityCurve()
FC.add_state('Minor', 1, {'Default': lognorm(0.5,scale=0.3)})
FC.add_state('Major', 2, {'Default': lognorm(0.5,scale=0.7)})
wntr.graphics.plot_fragility_curve(FC, xlabel='Peak Ground Acceleration (g)')
# Draw damage state for each pipe
pipe_PEDS = FC.cdf_probability(pga)
pipe_damage_state = FC.sample_damage_state(pipe_PEDS)
pipe_damage_state_map = FC.get_priority_map()
val = pipe_damage_state.map(pipe_damage_state_map)
custom_cmp = wntr.graphics.custom_colormap(3, ['grey', 'royalblue', 'darkorange'])
controls = list(wn._controls.keys())
controls.sort()
for text in controls:
all_control = wn.get_control(text)
if isinstance(all_control, wntr.network.TimeControl):
entry = 'Link {link} {setting} AT {compare} {time:g}\n'
vals = {'link': all_control._control_action._target_obj_ref.name,
'setting': get_setting(all_control, text),
'compare': 'TIME',
'time': all_control._run_at_time / 3600.0}
if vals['setting'] is None:
continue
if all_control._daily_flag:
vals['compare'] = 'CLOCKTIME'
f.write(entry.format(**vals).encode('ascii'))
elif isinstance(all_control, wntr.network.ConditionalControl):
entry = 'Link {link} {setting} IF Node {node} {compare} {thresh}\n'
vals = {'link': all_control._control_action._target_obj_ref.name,
'setting': get_setting(all_control, text),
'node': all_control._source_obj.name,
'compare': 'above',
'thresh': 0.0}
if vals['setting'] is None:
continue
if all_control._operation in [np.less, np.less_equal, Comparison.le, Comparison.lt]:
vals['compare'] = 'below'
threshold = all_control._threshold
if isinstance(all_control._source_obj, Tank):
vals['thresh'] = from_si(self.flow_units, threshold, HydParam.HydraulicHead)
elif isinstance(all_control._source_obj, Junction):
vals['thresh'] = from_si(self.flow_units, threshold, HydParam.Pressure)
else:
make sure all user-defined controls for ending the leak have
been removed before using this method (see
WaterNetworkModel.remove_leak() or
WaterNetworkModel.discard_leak()).
Parameters
----------
wn: WaterNetworkModel object
t: int
end time in seconds
"""
# remove old control
wn.discard_control(self._leak_end_control_name)
# add new control
end_control_action = wntr.network.ControlAction(self, 'leak_status', False)
control = wntr.network.TimeControl(wn, t, 'SIM_TIME', False, end_control_action)
wn.add_control(self._leak_end_control_name, control)
raise ValueError('Valve type not recognized' + str(valve_type))
elif attribute == 'setting':
setting = value
else:
setting = None
logger.warning('Could not write control '+str(control_name)+' - skipping')
return setting
f.write('[CONTROLS]\n'.encode('ascii'))
# Time controls and conditional controls only
controls = list(wn._controls.keys())
controls.sort()
for text in controls:
all_control = wn.get_control(text)
if isinstance(all_control, wntr.network.TimeControl):
entry = 'Link {link} {setting} AT {compare} {time:g}\n'
vals = {'link': all_control._control_action._target_obj_ref.name,
'setting': get_setting(all_control, text),
'compare': 'TIME',
'time': all_control._run_at_time / 3600.0}
if vals['setting'] is None:
continue
if all_control._daily_flag:
vals['compare'] = 'CLOCKTIME'
f.write(entry.format(**vals).encode('ascii'))
elif isinstance(all_control, wntr.network.ConditionalControl):
entry = 'Link {link} {setting} IF Node {node} {compare} {thresh}\n'
vals = {'link': all_control._control_action._target_obj_ref.name,
'setting': get_setting(all_control, text),
'node': all_control._source_obj.name,
'compare': 'above',
def water_quality_metrics(wn):
# Simulate hydraulics and water quality
sim = wntr.sim.EpanetSimulator(wn)
wn.options.quality.mode = 'CHEMICAL'
source_pattern = wntr.network.elements.Pattern.binary_pattern('SourcePattern',
start_time=2*3600, end_time=15*3600, duration=wn.options.time.duration,
step_size=wn.options.time.pattern_timestep)
wn.add_pattern('SourcePattern', source_pattern)
wn.add_source('Source1', '121', 'SETPOINT', 1000, 'SourcePattern')
wn.add_source('Source2', '123', 'SETPOINT', 1000, 'SourcePattern')
results_CHEM = sim.run_sim()
wn.options.quality.mode = 'AGE'
results_AGE = sim.run_sim()
wn.options.quality.mode = 'TRACE'
wn.options.quality.trace_node = '111'
results_TRACE = sim.run_sim()
# Plot chem scenario
CHEM_at_5hr = results_CHEM.node['quality'].loc[ 5*3600, :]
# Modify the water network model
wn.options.time.duration = 48*3600
wn.options.time.hydraulic_timestep = 1800
wn.options.time.report_timestep = 1800
# Set nominal pressures
for name, node in wn.junctions():
node.nominal_pressure = 15
# Define failure probability for each pipe, based on pipe diameter. Failure
# probability must sum to 1. Net3 has a few pipes with diameter = 99 inches,
# to exclude these from the set of feasible leak locations, use
# query_link_attribute
pipe_diameters = wn.query_link_attribute('diameter', np.less_equal,
0.9144, # 36 inches = 0.9144 m
link_type=wntr.network.Pipe)
failure_probability = pipe_diameters/pipe_diameters.sum()
# Pickle the network model and reload it for each realization
f=open('wn.pickle','wb')
pickle.dump(wn,f)
f.close()
# Run 5 realizations
results = {} # Initialize dictionary to store results
np.random.seed(67823) # Set random seed
for i in range(5):
# Select the number of leaks, random value between 1 and 5
N = np.random.randint(1,5+1)
# Select N unique pipes based on failure probability