Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Describe the uncertainty in the amount of damaged components needed to
trigger a red tag for the building. All Fragility Groups are handled in
the same multivariate distribution. Consequently, correlation between
various groups of component proportion limits can be specified. See
_create_RV_red_tags() for details.
6. Injuries
Describe the uncertainty in the proportion of people in the affected
area getting injuries exceeding a certain level of severity. FEMA P58
uses two severity levels: injury and fatality. Both levels for all
Fragility Groups are handled in the same multivariate distribution.
Consequently, correlation between various groups of component injury
expectations can be specified. See _create_RV_injuries() for details.
"""
super(FEMA_P58_Assessment, self).define_random_variables()
# create the random variables -----------------------------------------
DEP = self._AIM_in['dependencies']
self._RV_dict = {}
# quantities 100
self._RV_dict.update({'QNT':
self._create_RV_quantities(DEP['quantities'])})
# fragilities 300
s_fg_keys = sorted(self._FG_in.keys())
for c_id, c_name in enumerate(s_fg_keys):
comp = self._FG_in[c_name]
self._RV_dict.update({
EDP_input_path = EDP_files[s_i]
# and try to auto-populate the loss model using the BIM information
DL_input, DL_input_path = auto_populate(DL_input_path, EDP_input_path,
DL_method, realization_count,
coupled_EDP, event_time,
ground_failure)
DL_method = DL_input['DamageAndLoss']['_method']
stripe_str = '' if len(stripes) == 1 else str(stripe)+'_'
if DL_method == 'FEMA P58':
A = FEMA_P58_Assessment(log_file=log_file)
elif DL_method in ['HAZUS MH EQ', 'HAZUS MH', 'HAZUS MH EQ IM']:
A = HAZUS_Assessment(hazard = 'EQ', log_file=log_file)
elif DL_method == 'HAZUS MH HU':
A = HAZUS_Assessment(hazard = 'HU', log_file=log_file)
A.read_inputs(DL_input_path, EDP_files[s_i], verbose=False) # make DL inputs into array of all BIM files
A.define_random_variables()
A.define_loss_model()
A.calculate_damage()
A.calculate_losses()
A.aggregate_results()
# and try to auto-populate the loss model using the BIM information
DL_input, DL_input_path = auto_populate(DL_input_path, EDP_input_path,
DL_method, realization_count,
coupled_EDP, event_time,
ground_failure)
DL_method = DL_input['DamageAndLoss']['_method']
stripe_str = '' if len(stripes) == 1 else str(stripe)+'_'
if DL_method == 'FEMA P58':
A = FEMA_P58_Assessment(log_file=log_file)
elif DL_method in ['HAZUS MH EQ', 'HAZUS MH', 'HAZUS MH EQ IM']:
A = HAZUS_Assessment(hazard = 'EQ', log_file=log_file)
elif DL_method == 'HAZUS MH HU':
A = HAZUS_Assessment(hazard = 'HU', log_file=log_file)
A.read_inputs(DL_input_path, EDP_files[s_i], verbose=False) # make DL inputs into array of all BIM files
A.define_random_variables()
A.define_loss_model()
A.calculate_damage()
A.calculate_losses()
A.aggregate_results()
A.save_outputs(output_path, EDP_file, DM_file, DV_file, stripe_str,
DL_input, DL_input_path = auto_populate(DL_input_path, EDP_input_path,
DL_method, realization_count,
coupled_EDP, event_time,
ground_failure)
DL_method = DL_input['DamageAndLoss']['_method']
stripe_str = '' if len(stripes) == 1 else str(stripe)+'_'
if DL_method == 'FEMA P58':
A = FEMA_P58_Assessment(log_file=log_file)
elif DL_method in ['HAZUS MH EQ', 'HAZUS MH', 'HAZUS MH EQ IM']:
A = HAZUS_Assessment(hazard = 'EQ', log_file=log_file)
elif DL_method == 'HAZUS MH HU':
A = HAZUS_Assessment(hazard = 'HU', log_file=log_file)
A.read_inputs(DL_input_path, EDP_files[s_i], verbose=False) # make DL inputs into array of all BIM files
A.define_random_variables()
A.define_loss_model()
A.calculate_damage()
A.calculate_losses()
A.aggregate_results()
A.save_outputs(output_path, EDP_file, DM_file, DV_file, stripe_str,
detailed_results=detailed_results)
For the sake of efficiency, only the decision variables requested in
the input file are estimated. The following consequences are handled by
this method for a HAZUS assessment:
Reconstruction time and cost
Get a cost and time estimate for each Damage State in each Performance
Group. For more information about estimating reconstruction cost and
time see _calc_repair_cost_and_time() methods.
Injuries
The number of injuries are based on the probability of injuries of
various severity specified in the component data file. For more
information about estimating injuries _calc_non_collapse_injuries.
"""
super(HAZUS_Assessment, self).calculate_losses()
DVs = self._AIM_in['decision_variables']
# reconstruction cost and time
if DVs['rec_cost'] or DVs['rec_time']:
# all damages are considered repairable in HAZUS
repairable_IDs = self._ID_dict['non-collapse']
self._ID_dict.update({'repairable': repairable_IDs})
self._ID_dict.update({'irrepairable': []})
# reconstruction cost and time for repairable cases
DV_COST, DV_TIME = self._calc_repair_cost_and_time()
if DVs['rec_cost']:
self._DV_dict.update({'rec_cost': DV_COST})
if DVs['rec_time']:
# read the type of assessment from the DL input file
with open(DL_input_path, 'r') as f:
DL_input = json.load(f)
# check if the DL input file has information about the loss model
if 'DamageAndLoss' in DL_input:
pass
else:
# if the loss model is not defined, give a warning
print('WARNING No loss model defined in the BIM file. Trying to auto-populate.')
EDP_input_path = EDP_files[s_i]
# and try to auto-populate the loss model using the BIM information
DL_input, DL_input_path = auto_populate(DL_input_path, EDP_input_path,
DL_method, realization_count,
coupled_EDP, event_time,
ground_failure)
DL_method = DL_input['DamageAndLoss']['_method']
stripe_str = '' if len(stripes) == 1 else str(stripe)+'_'
if DL_method == 'FEMA P58':
A = FEMA_P58_Assessment(log_file=log_file)
elif DL_method in ['HAZUS MH EQ', 'HAZUS MH', 'HAZUS MH EQ IM']:
A = HAZUS_Assessment(hazard = 'EQ', log_file=log_file)
elif DL_method == 'HAZUS MH HU':
A = HAZUS_Assessment(hazard = 'HU', log_file=log_file)
def main(args):
parser = argparse.ArgumentParser()
parser.add_argument('--filenameDL')
parser.add_argument('--filenameEDP')
parser.add_argument('--DL_Method', default = None)
parser.add_argument('--Realizations', default = None)
parser.add_argument('--outputEDP', default='EDP.csv')
parser.add_argument('--outputDM', default = 'DM.csv')
parser.add_argument('--outputDV', default = 'DV.csv')
parser.add_argument('--dirnameOutput', default = None)
parser.add_argument('--event_time', default=None)
parser.add_argument('--detailed_results', default = True,
type = str2bool, nargs='?', const=True)
parser.add_argument('--coupled_EDP', default = False,
type = str2bool, nargs='?', const=False)
parser.add_argument('--log_file', default = True,
type = str2bool, nargs='?', const=True)
parser.add_argument('--ground_failure', default = False,
type = str2bool, nargs='?', const=False)
args = parser.parse_args(args)
log_msg('Initializing pelicun calculation...')
#print(args)
run_pelicun(
args.filenameDL, args.filenameEDP,
args.DL_Method, args.Realizations,
args.outputEDP, args.outputDM, args.outputDV,
output_path = args.dirnameOutput,
def main(args):
parser = argparse.ArgumentParser()
parser.add_argument('--filenameDL')
parser.add_argument('--filenameEDP')
parser.add_argument('--DL_Method', default = None)
parser.add_argument('--Realizations', default = None)
parser.add_argument('--outputEDP', default='EDP.csv')
parser.add_argument('--outputDM', default = 'DM.csv')
parser.add_argument('--outputDV', default = 'DV.csv')
parser.add_argument('--dirnameOutput', default = None)
parser.add_argument('--event_time', default=None)
parser.add_argument('--detailed_results', default = True,
type = str2bool, nargs='?', const=True)
parser.add_argument('--coupled_EDP', default = False,
type = str2bool, nargs='?', const=False)
parser.add_argument('--log_file', default = True,
type = str2bool, nargs='?', const=True)
parser.add_argument('--ground_failure', default = False,
type = str2bool, nargs='?', const=False)
args = parser.parse_args(args)
log_msg('Initializing pelicun calculation...')
#print(args)
run_pelicun(
args.filenameDL, args.filenameEDP,
args.DL_Method, args.Realizations,
args.outputEDP, args.outputDM, args.outputDV,
output_path = args.dirnameOutput,
detailed_results = args.detailed_results,
coupled_EDP = args.coupled_EDP,
DV_INJ_dict[i].loc[:,
(FG._ID, PG_ID, d_tag)] = INJ_i
# remove the useless columns from DV_INJ
for i in range(self._inj_lvls):
DV_INJ = DV_INJ_dict[i]
DV_INJ_dict[i] = DV_INJ.loc[:, (DV_INJ != 0.0).any(axis=0)]
# sort the columns to enable index slicing later
for i in range(self._inj_lvls):
DV_INJ_dict[i] = DV_INJ_dict[i].sort_index(axis=1, ascending=True)
return DV_INJ_dict
class HAZUS_Assessment(Assessment):
"""
An Assessment class that implements the damage and loss assessment method
following the HAZUS Technical Manual and the HAZUS software.
Parameters
----------
hazard: {'EQ', 'HU'}
Identifies the type of hazard. EQ corresponds to earthquake, HU
corresponds to hurricane.
default: 'EQ'.
inj_lvls: int
Defines the discretization used to describe the severity of injuries.
The HAZUS earthquake methodology uses 4 levels.
default: 4
"""
def __init__(self, hazard='EQ', inj_lvls = 4):
COV_mod = np.outer(sig_mod, sig_mod) * demand_RV.corr
else:
COV_mod = np.sqrt(demand_RV.COV**2. + self.beta_tot**2.)
# redefine the random variable
demand_RV = RandomVariable(
ID=200,
dimension_tags=demand_RV.dimension_tags,
distribution_kind=demand_RV.distribution_kind,
theta=demand_RV.theta,
COV=COV_mod)
return demand_RV
class FEMA_P58_Assessment(Assessment):
"""
An Assessment class that implements the loss assessment method in FEMA P58.
"""
def __init__(self, inj_lvls = 2):
super(FEMA_P58_Assessment, self).__init__()
# constants for the FEMA-P58 methodology
self._inj_lvls = inj_lvls
self._hazard = 'EQ'
self._assessment_type = 'P58'
def read_inputs(self, path_DL_input, path_EDP_input, verbose=False):
"""
Read and process the input files to describe the loss assessment task.
Parameters