Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""Generate XDMF-files
Parameters
----------
h5filename : str
Name of hdf5-file that we want to decorate with xdmf
"""
f = h5py.File(h5filename)
keys = []
f.visit(keys.append)
assert 'mesh' in keys
# Find unique groups
datasets = {2:{}, 3:{}} # 2D and 3D datasets
for key in keys:
if isinstance(f[key], h5py.Dataset):
if not 'mesh' in key:
tstep = int(key.split("/")[-1])
ndim = len(f[key].shape)
if ndim in (2, 3):
ds = datasets[ndim]
if tstep in ds:
ds[tstep] += [key]
else:
ds[tstep] = [key]
coor = {0:'x0', 1:'x1', 2:'x2'}
for ndim, dsets in six.iteritems(datasets):
timesteps = list(dsets.keys())
if not timesteps:
continue
from __future__ import absolute_import, with_statement
import copy
import posixpath
import h5py
import numpy as np
from .base import _PhynxProperties
from .exceptions import H5Error
from .registry import registry
from .utils import simple_eval, sync
class Dataset(h5py.Dataset, _PhynxProperties):
"""
"""
def _get_acquired(self):
return self.attrs.get('acquired', self.npoints)
def _set_acquired(self, value):
self.attrs['acquired'] = int(value)
acquired = property(_get_acquired, _set_acquired)
@property
def entry(self):
try:
target = self.file['/'.join(self.parent.name.split('/')[:2])]
assert isinstance(target, registry['Entry'])
return target
:param ods: input ODS to be populated
:param data: HDF5 dataset of group
'''
import h5py
keys = data.keys()
try:
keys = sorted(list(map(int, keys)))
except ValueError:
pass
for oitem in keys:
item = str(oitem)
if item.endswith('_error_upper'):
continue
if isinstance(data[item], h5py.Dataset):
if item + '_error_upper' in data:
ods.setraw(item, uarray(data[item][()], data[item + '_error_upper'][()]))
else:
ods.setraw(item, data[item][()])
elif isinstance(data[item], h5py.Group):
convertDataset(ods.setraw(oitem, ods.same_init_ods()), data[item])
def addIfWorthy(name, obj):
try:
if not isinstance(obj, h5py.Dataset):
return
if not obj.dtype.kind in "biufc":
return
if numpy.prod(obj.shape) <= 1:
return
# TODO: if it's an image, open it as an image
# TODO: try to get some metadata?
da = model.DataArray(obj[...])
except Exception:
logging.info("Skipping '%s' as it doesn't seem a correct data", name)
data.append(da)
def append_if_dataset(name,obj):
if isinstance(obj,h5py.Dataset):
all_names.append(name)
f.visititems(append_if_dataset)
self.kwargs.update( {"driver": 'mpio', "comm": MPI.COMM_WORLD} )
self.h5f = h5py.File(*self.args, **self.kwargs)
return self.h5f
def __exit__(self, *args):
# close file, then send signal to next proc via parent (if
# running sequentially)
self.h5f.close()
super(h5File, self).__exit__(*args)
# add barrier to be safe
uw.mpi.barrier()
class _PatchedDataset(h5py.Dataset):
@property
def collective(self):
class _dummy_manager():
def __enter__(self):
pass
def __exit__(self, *args):
pass
return _dummy_manager()
def h5_require_dataset(h5f, name, *args, **kwargs):
"""
This function either creates uses an existing file
dataset where compatible, or creates a dataset of the
required size/type.
@discover.register(h5py.Dataset)
def discover_h5py_dataset(d):
dshape = datashape.from_numpy(d.shape, d.dtype)
shape, measure = dshape.shape, dshape.measure
if not isrecord(measure):
if dshape == datashape.object_:
args = shape + (datashape.string,)
return DataShape(*args)
return dshape
else:
records = list(record_dshape_replace(measure, datashape.object_,
datashape.string))
args = shape + (datashape.Record(records),)
return DataShape(*args)
def customContextMenu(self, event):
"""Called to populate the context menu
:param silx.gui.hdf5.Hdf5ContextMenuEvent event: Event
containing expected information to populate the context menu
"""
selectedObjects = event.source().selectedH5Nodes()
menu = event.menu()
hasDataset = False
for obj in selectedObjects:
if obj.ntype is h5py.Dataset:
hasDataset = True
break
if not menu.isEmpty():
menu.addSeparator()
if hasDataset:
action = qt.QAction("Do something on the datasets", event.source())
menu.addAction(action)
def print_group_or_dset(group_or_dset_key, group_or_dset, table_data, success_thresholds=None, level=1):
if type(group_or_dset) == h5py.Group:
group_key, group = group_or_dset_key, group_or_dset
print('\t'*level + group_key + ':')
table_data[group_key] = OrderedDict()
for key, value in group.items():
print_group_or_dset(key, value, table_data[group_key], level=level+1)
elif type(group_or_dset) == h5py.Dataset:
dset_key, dset = group_or_dset_key, group_or_dset
if success_thresholds and dset_key in success_thresholds:
thresholds = success_thresholds[dset_key]
for threshold in thresholds:
dset_key_str = dset_key + ' < ' + str(threshold)
print('\t'*level + dset_key_str + ':', (dset[()] < threshold).mean())
table_data[dset_key_str] = (dset[()] < threshold).mean()
else:
print('\t'*level + dset_key + ':', dset[()])
table_data[dset_key] = np.asscalar(dset[()]) if dset[()].size == 1 else dset[()]
else:
raise
objref = hdf.attrs['objref']
else:
objref = None
# if this HDF group has an objref that points to an already recontructed
# object, simple return this object again
if objref is not None and objref in memo:
obj = memo[objref]
if __debug__:
debug('HDF5', "Use tracked object %s (%i)" % (type(obj), objref))
return obj
#
# Actual data
#
if isinstance(hdf, h5py.Dataset):
if __debug__:
debug('HDF5', "Load from HDF5 dataset [%s]" % hdf.name)
if 'is_scalar' in hdf.attrs:
# extract the scalar from the 0D array
obj = hdf[()]
# and coerce it back into the native Python type if necessary
if issubclass(type(obj), np.generic):
obj = np.asscalar(obj)
elif 'is_numpy_scalar' in hdf.attrs:
# extract the scalar from the 0D array as is
obj = hdf[()]
else:
obj = _hdf_to_ndarray(hdf)
else:
# check if we have a class instance definition here