Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
zarr.codecs.codec_registry[numcodecs.Pickle.codec_id] = numcodecs.Pickle
zarr.codecs.codec_registry[numcodecs.MsgPack.codec_id] = numcodecs.MsgPack
from progressivis.core.config import get_option
from .base import StorageEngine, Group, Attribute, Dataset
# For now (April 21st 2017), zarr misses two important features: fancy indexing and Boolean indexing.
# These two features are scheduled for inclusion in future releases of zarr.
Group.register(zarr.Group)
Attribute.register(Attributes)
Dataset.register(zarr.Array)
class ZARRGroup(zarr.Group):
def __init__(self, store, **kwds):
super(ZARRGroup, self).__init__(store, **kwds)
def create_dataset(self, name, shape=None, dtype=None, data=None, maxshape=None, **kwds):
_ = maxshape
if kwds.get('compression') is None:
kwds.update(get_option('storage.zarr.filter', {}))
filters = kwds.get('filters', [])
filters.append(numcodecs.MsgPack())
kwds['filters'] = filters
if dtype == np.dtype('O') and kwds.get('object_codec') is None:
kwds.update({'object_codec': numcodecs.VLenUTF8()})
return super(ZARRGroup, self).create_dataset(name, shape=shape, dtype=dtype, data=data, **kwds)
def require_dataset(self, name, shape, dtype=None, exact=False, **kwds):
if kwds.get('compression') is None:
import zarr
from zarr.attrs import Attributes
from zarr.storage import init_group, contains_group
import numcodecs
zarr.codecs.codec_registry[numcodecs.Pickle.codec_id] = numcodecs.Pickle
zarr.codecs.codec_registry[numcodecs.MsgPack.codec_id] = numcodecs.MsgPack
from progressivis.core.config import get_option
from .base import StorageEngine, Group, Attribute, Dataset
# For now (April 21st 2017), zarr misses two important features: fancy indexing and Boolean indexing.
# These two features are scheduled for inclusion in future releases of zarr.
Group.register(zarr.Group)
Attribute.register(Attributes)
Dataset.register(zarr.Array)
class ZARRGroup(zarr.Group):
def __init__(self, store, **kwds):
super(ZARRGroup, self).__init__(store, **kwds)
def create_dataset(self, name, shape=None, dtype=None, data=None, maxshape=None, **kwds):
_ = maxshape
if kwds.get('compression') is None:
kwds.update(get_option('storage.zarr.filter', {}))
filters = kwds.get('filters', [])
filters.append(numcodecs.MsgPack())
kwds['filters'] = filters
if dtype == np.dtype('O') and kwds.get('object_codec') is None:
kwds.update({'object_codec': numcodecs.VLenUTF8()})
def __init__(
self,
dataset: xr.Dataset,
model: Model,
zobject: Optional[Union[zarr.Group, MutableMapping, str]] = None,
encoding: Optional[EncodingDict] = None,
batch_dim: Optional[str] = None,
lock: Optional[Any] = None,
):
self.dataset = dataset
self.model = model
self.in_memory = False
self.consolidated = False
if isinstance(zobject, zarr.Group):
self.zgroup = zobject
elif zobject is None:
self.zgroup = zarr.group(store=zarr.MemoryStore())
self.in_memory = True
else:
self.zgroup = zarr.group(store=zobject)
self.output_vars = dataset.xsimlab.output_vars_by_clock
self.output_save_steps = dataset.xsimlab.get_output_save_steps()
if encoding is None:
encoding = {}
self.var_info = _get_var_info(dataset, model, encoding)
self.batch_dim = batch_dim
def set_default(self, f):
assert isinstance(f, zarr.Group)
if self._zarr is not None:
self.close()
self._zarr = f