Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def init_mp_pool(reset=False):
"""Necessary because at import time, cfg might be uninitialized"""
global _mp_pool
if _mp_pool and not reset:
return _mp_pool
cfg.CONFIG_MODIFIED = False
if _mp_pool and reset:
_mp_pool.terminate()
_mp_pool = None
cfg_contents = cfg.pack_config()
global_lock = mp.Manager().Lock()
mpp = cfg.PARAMS['mp_processes']
if mpp == -1:
try:
mpp = int(os.environ['SLURM_JOB_CPUS_PER_NODE'])
log.workflow('Multiprocessing: using slurm allocated '
'processors (N={})'.format(mpp))
except KeyError:
mpp = mp.cpu_count()
log.workflow('Multiprocessing: using all available '
'processors (N={})'.format(mpp))
else:
log.workflow('Multiprocessing: using the requested number of '
'processors (N={})'.format(mpp))
_mp_pool = mp.Pool(mpp, initializer=_init_pool_globals,
initargs=(cfg_contents, global_lock))
return _mp_pool
cfg.PATHS['cru_dir'] = os.path.join(DATA_DIR, 'cru')
cfg.PATHS['rgi_dir'] = os.path.join(DATA_DIR, 'rgi')
# Currently OGGM wants some directories to exist
# (maybe I'll change this but it can also catch errors in the user config)
utils.mkdir(cfg.PATHS['working_dir'])
utils.mkdir(cfg.PATHS['cru_dir'])
utils.mkdir(cfg.PATHS['rgi_dir'])
# Use multiprocessing?
cfg.PARAMS['use_multiprocessing'] = True
cfg.PARAMS['continue_on_error'] = True
# Other params
cfg.PARAMS['border'] = 80
cfg.PARAMS['temp_use_local_gradient'] = False
cfg.PARAMS['invert_with_sliding'] = False
# Download RGI files
rgi_dir = utils.get_rgi_dir()
rgi_shp = list(glob(os.path.join(rgi_dir, "*", rgi_reg+ '_rgi50_*.shp')))
assert len(rgi_shp) == 1
rgidf = gpd.read_file(rgi_shp[0])
log.info('Number of glaciers: {}'.format(len(rgidf)))
# Download other files if needed
_ = utils.get_cru_file(var='tmp')
_ = utils.get_cru_file(var='pre')
_ = utils.get_demo_file('Hintereisferner.shp')
# Go - initialize working directories
# Use multiprocessing?
cfg.PARAMS['use_multiprocessing'] = True
# How many grid points around the glacier?
# Make it large if you expect your glaciers to grow large
cfg.PARAMS['border'] = border
# Set to True for operational runs
cfg.PARAMS['continue_on_error'] = True
# For statistics
odf = pd.DataFrame()
if rgi_version is None:
rgi_version = cfg.PARAMS['rgi_version']
base_dir = os.path.join(output_folder)
# Add a package version file
utils.mkdir(base_dir)
opath = os.path.join(base_dir, 'package_versions.txt')
with open(opath, 'w') as vfile:
vfile.write(utils.show_versions(logger=log))
# Read RGI
start = time.time()
if test_rgidf is None:
# Get the RGI file
rgidf = gpd.read_file(utils.get_rgi_region_file(rgi_reg,
version=rgi_version))
# We use intersects
rgif = utils.get_rgi_intersects_region_file(rgi_reg,
glacier_mask_2d = glacier_mask_2d == 1
catchment_mask_2d = glacier_mask_2d * np.NaN
cls = gdir.read_pickle('centerlines')
# Catchment areas
cis = gdir.read_pickle('geometries')['catchment_indices']
for j, ci in enumerate(cis):
catchment_mask_2d[tuple(ci.T)] = j
# Make everything we need flat
catchment_mask = catchment_mask_2d[glacier_mask_2d].astype(int)
topo = topo_2d[glacier_mask_2d]
# Prepare the distributed mass-balance data
rho = cfg.PARAMS['ice_density']
dx2 = gdir.grid.dx ** 2
# Linear
def to_minimize(ela_h):
mbmod = LinearMassBalance(ela_h[0])
smb = mbmod.get_annual_mb(heights=topo)
return np.sum(smb)**2
ela_h = optimization.minimize(to_minimize, [0.], method='Powell')
mbmod = LinearMassBalance(float(ela_h['x']))
lin_mb_on_z = mbmod.get_annual_mb(heights=topo) * cfg.SEC_IN_YEAR * rho
if not np.isclose(np.sum(lin_mb_on_z), 0, atol=10):
raise RuntimeError('Spec mass-balance should be zero but is: {}'
.format(np.sum(lin_mb_on_z)))
# Normal OGGM (a bit tweaked)
df = gdir.read_json('local_mustar')
cfg.initialize()
# Local paths (where to write output and where to download input)
WORKING_DIR = '/home/mowglie/disk/OGGM_Runs/TESTS'
utils.mkdir(WORKING_DIR)
cfg.PATHS['working_dir'] = WORKING_DIR
PLOTS_DIR = os.path.join(WORKING_DIR, 'plots')
utils.mkdir(PLOTS_DIR)
# Use multiprocessing?
cfg.PARAMS['use_multiprocessing'] = False
# How many grid points around the glacier?
# Make it large if you expect your glaciers to grow large
cfg.PARAMS['border'] = 100
# Set to True for operational runs
cfg.PARAMS['continue_on_error'] = False
cfg.PARAMS['auto_skip_task'] = True
# Don't use divides for now
cfg.set_divides_db()
# But we use intersects
rgi_dir = utils.get_rgi_intersects_dir()
rgi_shp = list(glob(os.path.join(rgi_dir, "*", '*intersects*' + rgi_reg + '_rgi50_*.shp')))
assert len(rgi_shp) == 1
cfg.set_intersects_db(rgi_shp[0])
# Pre-download other files which will be needed later
_ = utils.get_cru_file(var='tmp')
RGI_PATH = os.path.join(MAIN_PATH,
'global_data_base/RGI_inventory/All_alaska_glacier_goodformat/')
RGI_FILE = os.path.join(RGI_PATH,'01_rgi50_Alaska.shp')
DATA_INPUT = os.path.join(MAIN_PATH, 'OGGM_Alaska_run/input_data/')
if Cluster:
SLURM_WORKDIR = os.environ["WORKDIR"]
# Local paths (where to write output and where to download input)
WORKING_DIR = SLURM_WORKDIR
RGI_PATH = '/home/users/bea/oggm_run_alaska/input_data/Alaska_tidewater_andlake/'
RGI_FILE = os.path.join(RGI_PATH,'01_rgi50_Alaska.shp')
DATA_INPUT = '/home/users/bea/oggm_run_alaska/input_data/'
# Configuration of the run
cfg.PARAMS['rgi_version'] = "5"
cfg.PATHS['working_dir'] = WORKING_DIR
# How many grid points around the glacier?
# Make it large if you expect your glaciers to grow large
cfg.PARAMS['border'] = 80
cfg.PARAMS['optimize_inversion_params'] = True
#cfg.PARAMS['inversion_glen_a'] = 2.6e-24
# Set to True for cluster runs
if Cluster:
cfg.PARAMS['use_multiprocessing'] = True
cfg.PARAMS['continue_on_error'] = True
else:
cfg.PARAMS['use_multiprocessing'] = False
cfg.PARAMS['continue_on_error'] = False
# We use intersects
self.dt_warning = False
if fixed_dt is not None:
min_dt = fixed_dt
max_dt = fixed_dt
self.min_dt = min_dt
self.max_dt = max_dt
self.cfl_number = cfl_number
self.calving_m3_since_y0 = 0. # total calving since time y0
# Do we want to use shape factors?
self.sf_func = None
# Use .get to obtain default None for non-existing key
# necessary to pass some tests
# TODO: change to direct dictionary query after tests are adapted?
use_sf = cfg.PARAMS.get('use_shape_factor_for_fluxbasedmodel')
if use_sf == 'Adhikari' or use_sf == 'Nye':
self.sf_func = utils.shape_factor_adhikari
elif use_sf == 'Huss':
self.sf_func = utils.shape_factor_huss
# Optim
self.slope_stag = []
self.thick_stag = []
self.section_stag = []
self.u_stag = []
self.shapefac_stag = []
self.flux_stag = []
self.trib_flux = []
for fl, trib in zip(self.fls, self._tributary_indices):
nx = fl.nx
# This is not staggered
self.mb_model = mb_model
# Defaults
if glen_a is None:
glen_a = cfg.PARAMS['glen_a']
if fs is None:
fs = cfg.PARAMS['fs']
self.glen_a = glen_a
self.fs = fs
self.glen_n = cfg.PARAMS['glen_n']
self.rho = cfg.PARAMS['ice_density']
self.check_for_boundaries = check_for_boundaries and not is_tidewater
# we keep glen_a as input, but for optimisation we stick to "fd"
self._fd = 2. / (cfg.PARAMS['glen_n']+2) * self.glen_a
self.y0 = None
self.t = None
self.reset_y0(y0)
self.fls = None
self._tributary_indices = None
self.reset_flowlines(flowlines, inplace=inplace)
depending on the length of your data. The default is to choose
it ourselves based on the starting year.
calendar : str
If you use an exotic calendar (e.g. 'noleap')
file_name : str
How to name the file
filesuffix : str
Apply a suffix to the file
"""
# overwrite as default
fpath = self.get_filepath(file_name, filesuffix=filesuffix)
if os.path.exists(fpath):
os.remove(fpath)
zlib = cfg.PARAMS['compress_climate_netcdf']
if time_unit is None:
# http://pandas.pydata.org/pandas-docs/stable/timeseries.html
# #timestamp-limitations
if time[0].year > 1800:
time_unit = 'days since 1801-01-01 00:00:00'
elif time[0].year >= 0:
time_unit = ('days since {:04d}-01-01 '
'00:00:00'.format(time[0].year))
else:
raise InvalidParamsError('Time format not supported')
with ncDataset(fpath, 'w', format='NETCDF4') as nc:
nc.ref_hgt = ref_pix_hgt
nc.ref_pix_lon = ref_pix_lon
nc.ref_pix_lat = ref_pix_lat
calendar : str
If you use an exotic calendar (e.g. 'noleap')
"""
# Standard sanity checks
months = temp['time.month']
if (months[0] != 1) or (months[-1] != 12):
raise ValueError('We expect the files to start in January and end in '
'December!')
if (np.abs(temp['lon']) > 180) or (np.abs(prcp['lon']) > 180):
raise ValueError('We expect the longitude coordinates to be within '
'[-180, 180].')
# from normal years to hydrological years
sm = cfg.PARAMS['hydro_month_' + gdir.hemisphere]
prcp = prcp[sm-1:sm-13].load()
temp = temp[sm-1:sm-13].load()
# Get CRU to apply the anomaly to
fpath = gdir.get_filepath('climate_monthly')
ds_cru = xr.open_dataset(fpath)
# Add CRU clim
dscru = ds_cru.sel(time=slice(*year_range))
# compute monthly anomalies
# of temp
if scale_stddev:
# This is a bit more arithmetic
ts_tmp_sel = temp.sel(time=slice(*year_range))
ts_tmp_std = ts_tmp_sel.groupby('time.month').std(dim='time')