Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
data_view = data.ravel()
if data_view.base is not data:
raise ValueError('view expected')
if data_view.size < cube_store.ndim * 2:
raise ValueError('size too small')
for i in range(cube_store.ndim):
j1 = cube_store.chunks[i] * index[i]
j2 = j1 + cube_store.chunks[i]
data_view[2 * i] = j1
data_view[2 * i + 1] = j2
return data.tobytes()
store = ChunkStore(dims, shape, chunks)
store.add_lazy_array('__index_var__', '
def _test_script(self, script_path):
result = self.invoke_cli(['apply',
'--dask', 'parallelized',
'--params', 'a=0.1,b=0.4',
'--vars', 'precipitation,soil_moisture',
OUTPUT_PATH,
script_path,
INPUT_PATH])
self.assertEqual(0, result.exit_code)
self.assertTrue(os.path.isdir(OUTPUT_PATH))
ds = xr.open_zarr(OUTPUT_PATH)
self.assertEqual(['output'], list(ds.data_vars))
self.assertAlmostEqual(0.45, float(ds.output.mean()))
def test_update_corrupt_cube(self):
self.write_cube('2019-01-01', 3)
cube = xr.open_zarr(self.CUBE_PATH)
t, y, x = cube.precipitation.shape
new_shape = y, t, x
t, y, x = cube.precipitation.dims
new_dims = y, t, x
cube['precipitation'] = xr.DataArray(cube.precipitation.values.reshape(new_shape),
dims=new_dims,
coords=cube.precipitation.coords)
cube.to_zarr(self.CUBE_PATH_2)
with self.assertRaises(ValueError) as cm:
insert_time_slice(self.CUBE_PATH_2, 2, self.make_slice('2019-01-02T06:30'))
self.assertEqual("dimension 'time' of variable 'precipitation' must be first dimension",
f"{cm.exception}")
"""
Initialise local xarray, whose dask arrays contain tasks that pull data
The matadata contains a key "internal", which is a result of running
``serialize_zarr_ds`` on the xarray on the server. It is a dict
containing the metadata parts of the original dataset (i.e., the
keys with names like ".z*"). This can be opened by xarray as-is, and
will make a local xarray object. In ``._get_schema()``, the numpy
parts (coordinates) are fetched and the dask-array parts (cariables)
have their dask graphs redefined to tasks that fetch data from the
server.
"""
import xarray as xr
super(RemoteXarray, self).__init__(url, headers, **kwargs)
self._schema = None
self._ds = xr.open_zarr(self.metadata['internal'])
if isinstance(path, fsspec.mapping.FSMap):
protocol = path.fs.protocol
if isinstance(protocol, list):
protocol = tuple(protocol)
if protocol in {'http', 'https', 'file'} or protocol is None:
path = path.root
root = path
else:
root = path.root
if data_format == 'zarr':
logger.debug(f'Opening zarr store: {root} - protocol: {protocol}')
try:
ds = xr.open_zarr(path, **zarr_kwargs)
except Exception as e:
logger.error(f'Failed to open zarr store with zarr_kwargs={zarr_kwargs}')
raise e
else:
logger.debug(f'Opening netCDF/HDF dataset: {root} - protocol: {protocol}')
try:
ds = xr.open_dataset(path, **cdf_kwargs)
except Exception as e:
logger.error(f'Failed to open netCDF/HDF dataset with cdf_kwargs={cdf_kwargs}')
raise e
ds.attrs['intake_esm_varname'] = varname
if preprocess is None:
return ds
def open(self, **kwargs):
"""
Open the dataset pointed to by the instance's _predictor_file attribute onto self.data
:param kwargs: passed to xarray.open_dataset() or xarray.open_zarr()
"""
if self._predictor_file.endswith('.zarr'):
self.data = xr.open_zarr(self._predictor_file, **kwargs)
else:
self.data = xr.open_dataset(self._predictor_file, **kwargs)
def open_dataset(self, dataset_id: str, **open_params) -> xr.Dataset:
import s3fs
s3, open_params = _get_s3_and_consume_params(open_params)
return xr.open_zarr(s3fs.S3Map(root=dataset_id, s3=s3, check=False), **open_params)
def get_time_insert_index(store: Union[str, MutableMapping],
slice_time):
try:
cube = xr.open_zarr(store)
except ValueError:
# ValueError raised if cube store does not exist
return -1
# TODO (forman): optimise following naive search by bi-sectioning or so
for i in range(cube.time.size):
time = cube.time[i]
if slice_time == time:
raise NotImplementedError(f'time already found in {store}, this is not yet supported')
if slice_time < time:
return i
return -1
:return: the dataset for the level at *index*.
"""
ext, level_path = self._level_paths[index]
if ext == ".link":
with self._obs_file_system.open(level_path, "w") as fp:
level_path = fp.read()
# if file_path is a relative path, resolve it against the levels directory
if not os.path.isabs(level_path):
base_dir = os.path.dirname(self._dir_path)
level_path = os.path.join(base_dir, level_path)
store = s3fs.S3Map(root=level_path, s3=self._obs_file_system, check=False)
cached_store = zarr.LRUStoreCache(store, max_size=2 ** 28)
with measure_time(tag=f"opened remote dataset {level_path} for level {index}"):
consolidated = self._obs_file_system.exists(f'{level_path}/.zmetadata')
return assert_cube(xr.open_zarr(cached_store, consolidated=consolidated, **zarr_kwargs), name=level_path)