Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# The file size is fine
return xr.open_mfdataset(paths, concat_dim=concat_dim, **kwargs)
divisor = sqrt(n_chunks)
# Chunking will pretty much 'always' be 2x2, very rarely 3x3 or 4x4. 5x5
# would imply an uncompressed single file of ~6GB! All expected grids
# should be divisible by 2,3 and 4.
if not (n_lat % divisor == 0) or not (n_lon % divisor == 0):
raise ValueError("Can't find a good chunking strategy for the given"
"data source. Are lat/lon coordinates divisible by "
"{}?".format(divisor))
chunks = {lat: n_lat // divisor, lon: n_lon // divisor}
return xr.open_mfdataset(paths, concat_dim=concat_dim, chunks=chunks, **kwargs)
def open_mfdataset_glob(self):
"""
Use xarray.open_mfdataset to read multiple netcdf files with a glob
pattern.
"""
pattern = os.path.join(self.data_dir, "*PropertiesRhineMeuse30min.nc")
xarray.open_mfdataset(pattern)
def open_xarray_dataset(paths, preprocess=True, chunks=None, **kwargs) -> xr.Dataset:
"""
Adapted version of the xarray 'open_mfdataset' function.
"""
if isinstance(paths, str):
paths = sorted(glob(paths))
if not paths:
raise IOError('no files to open')
if not preprocess:
return xr.open_mfdataset(paths, concat_dim='time')
# open all datasets
lock = xr.backends.api._default_lock(paths[0], None)
# TODO (forman, 20160601): align with chunking from netcdf metadata attribute
datasets = []
engine = 'netcdf4'
for p in paths:
datasets.append(xr.open_dataset(p, engine=engine, decode_cf=False, chunks=chunks or {}, lock=lock, **kwargs))
preprocessed_datasets = []
file_objs = []
for ds in datasets:
pds = _preprocess_datasets(ds)
if pds is None:
ds._file_obj.close()
ds_obs_1var[v].attrs = ds_obs[v].attrs
if i:
ds_obs_1var[v].encoding['_FillValue'] = None
for v in ds_obs_1var:
for attr in attrs_to_delete:
if attr in ds_obs_1var[v].attrs:
del ds_obs_1var[v].attrs[attr]
if 'time' in ds_obs_1var['xv'].dims:
ds_obs_1var['xv'] = ds_obs_1var['xv'].isel(time=0)
ds_obs_1var['yv'] = ds_obs_1var['yv'].isel(time=0)
print('ds_obs_1var')
ds_obs_1var.info()
da_train = xr.open_mfdataset(
train_fname.format(gcm_var=gcm_var), chunks=chunks,
combine='by_coords', data_vars='minimal')[gcm_var].pipe(resample, time_bounds)
da_predict = xr.open_mfdataset(
predict_fname.format(gcm_var=gcm_var), chunks=chunks,
combine='by_coords', data_vars='minimal')[gcm_var].pipe(resample, predict_time_bounds)
print('da_train', da_train)
print('da_predict', da_predict)
anoms[obs_var] = bcsd(ds_obs_1var, da_train.to_dataset(name=obs_var),
da_predict.to_dataset(name=obs_var),
var=obs_var)
out[obs_var] = disagg(ds_obs_daily[obs_var], anoms[obs_var],
var=obs_var)
for var in ['xv', 'yv']:
if var not in out.coords:
out[var] = ds_obs_1var[var]
def _read_dataset(input_file):
input_file_name = os.path.basename(input_file)
if os.path.isdir(input_file):
if input_file_name.endswith('.zarr'):
ds = xr.open_zarr(input_file)
else:
ds = xr.open_mfdataset(glob.glob(os.path.join(input_file, '**', '*.nc'), recursive=True))
else:
if input_file_name.endswith('.zarr.zip'):
ds = xr.open_zarr(input_file)
else:
ds = xr.open_dataset(input_file)
return ds
def open_metcro2d(self, f):
from glob import glob
from numpy import sort
try:
if type(f) == str:
self.metcrofnames = sort(array(glob(f)))
print self.metcrofnames
else:
self.metcrofnames = sort(array(f))
print self.metcrofnames
if self.metcrofnames.shape[0] >= 1:
self.metcro2d = xr.open_mfdataset(self.metcrofnames.tolist(), concat_dim='TSTEP')
self.metcrokeys = self.metcro2d.keys()
self.get_metcro2d_dates()
if self.grid is not None:
self.metcro2d = self.metcro2d.assign(latitude=self.grid.LAT.squeeze())
self.metcro2d = self.metcor2d.assign(longitude=self.grid.LON.squeeze())
self.metcro2d = self.metcro2d.set_coords(['latitude','longitude'])
except:
print 'METCRO2D Files Not Found'
pass
def open_glm_time_series(filenames, chunks=None):
""" Convenience function for combining individual 1-min GLM gridded imagery
files into a single xarray.Dataset with a time dimension.
Creates an index on the time dimension.
The time dimension will be in the order in which the files are listed
due to the behavior of combine='nested' in open_mfdataset.
Adjusts the time_coverage_start and time_coverage_end metadata.
"""
# Need to fix time_coverage_start and _end in concat dataset
starts = [t for t in gen_file_times(filenames)]
ends = [t for t in gen_file_times(filenames, time_attr='time_coverage_end')]
d = xr.open_mfdataset(filenames, concat_dim='time', chunks=chunks, combine='nested')
d['time'] = starts
d = d.set_index({'time':'time'})
d = d.set_coords('time')
d.attrs['time_coverage_start'] = pd.Timestamp(min(starts)).isoformat()
d.attrs['time_coverage_end'] = pd.Timestamp(max(ends)).isoformat()
return d
ds = ds.where(ds.month == month, drop=True)
ds = ds.mean(dim='Time')
ds.compute(num_workers=self.subprocessCount)
write_netcdf(ds, climatologyFileName)
else:
outFileName = parentTask.get_file_name(season=season)
self.logger.info('computing climatology {}'.format(
os.path.basename(outFileName)))
fileNames = []
weights = []
for month in constants.monthDictionary[season]:
monthName = constants.abrevMonthNames[month-1]
fileNames.append(parentTask.get_file_name(season=monthName))
weights.append(constants.daysInMonth[month-1])
with xarray.open_mfdataset(fileNames, concat_dim='weight',
combine='nested',
chunks={'nCells': chunkSize},
decode_cf=False, decode_times=False,
preprocess=_preprocess) as ds:
ds.coords['weight'] = ('weight', weights)
ds = ((ds.weight*ds).sum(dim='weight') /
ds.weight.sum(dim='weight'))
ds.compute(num_workers=self.subprocessCount)
write_netcdf(ds, outFileName)
second_file = netcdf_temp
keeper_data_vars = [var_name_precip, var_name_temp]
elif netcdf_pet is not None:
second_file = netcdf_pet
keeper_data_vars = [var_name_precip, var_name_pet]
else:
message = "SPEI requires either PET or temperature to compute PET, but neither input file was provided."
_logger.error(message)
raise ValueError(message)
# open the precipitation and secondary input NetCDFs as an xarray DataSet object
dataset = xr.open_mfdataset([netcdf_precip, second_file]) # , chunks={'lat': 10})
# trim out all data variables from the dataset except precipitation and temperature
for var in dataset.data_vars:
if var not in keeper_data_vars:
dataset = dataset.drop(var)
# get the initial year of the data
data_start_year = int(str(dataset['time'].values[0])[0:4])
# get the scale increment for use in later log messages
if arguments.periodicity is compute.Periodicity.daily:
scale_increment = 'day'
pet_method = "Hargreaves"
elif arguments.periodicity is compute.Periodicity.monthly:
scale_increment = 'month'
pet_method = "Thornthwaite"
def time_load_dataset_netcdf4(self):
xr.open_mfdataset(self.filenames_list, engine="netcdf4").load()