Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
rows = [row() for i in range(N)]
class PYT(tables.IsDescription):
a = tables.Int32Col()
b = tables.UInt8Col()
c = tables.Float32Col()
d = tables.StringCol(len(str(uuid4()))*4)
e = tables.Time32Col()
f = tables.Time32Col()
h5file = tables.open_file('/tmp/hdf5/tutorial1.h5', mode='w', title='Test file')
group = h5file.create_group('/', 'detector', 'Detector information')
table = h5file.create_table(group, 'readout', PYT, 'Readout example',
filters=tables.Filters(complevel=9, complib='zlib'))
tr = table.row
with Timer() as t:
cache = []
for i, row in enumerate(rows, 1):
for i, h in enumerate(headers):
tr[h] = row[i]
tr.append()
table.flush()
h5file.close()
print('PyTables write ', float(N) / t.elapsed, N)
h5file = tables.open_file('/tmp/hdf5/tutorial1.h5', mode='r', title='Test file')
def save_photons(
self,
filename,
mode: str = 'photons',
group_title: str = 'dye_diffusion',
hist_bins: int = 4096,
hist_range=(0, 50),
**kwargs
):
verbose = kwargs.get('verbose', self.verbose)
if mode == 'photons':
dtTAC = self.diffusion.simulation_parameter.t_step
dtMT = self.decay_parameter.dt_mt
photons = self.photon_trace
filters = tables.Filters(complib='blosc', complevel=9)
h5 = tables.open_file(
filename, mode="w", title=filename,
filters=filters
)
h5.create_group("/", group_title)
headertable = h5.createTable(
'/' + group_title, 'header',
description=chisurf.fio.photons.Header,
filters=filters
)
headertable = h5.createTable(
'/' + group_title, 'header',
description=chisurf.fio.photons.Header,
filters=filters
)
h5.close()
def _write_source(src: ArraySource,
hfile: tables.File,
atom: tables.Atom,
name: str,
transform: Worker,
n_workers: int,
batchrows: Optional[int] = None
) -> None:
front_shape = src.shape[0:-1]
filters = tables.Filters(complevel=1, complib="blosc:lz4")
array = hfile.create_carray(hfile.root, name=name,
atom=atom, shape=front_shape, filters=filters)
array.attrs.missing = src.missing
batchrows = batchrows if batchrows else src.native
log.info("Writing {} to HDF5 in {}-row batches".format(name, batchrows))
_write(src, array, batchrows, n_workers, transform)
def save_arr_as_tbl(fname, tname, cols):
"""
Given 1D arrays save (or add if file exists) a Table.
fname : name of new or existing file.
tname : name of new table.
cols : a dictionary {'colname': colval, ...}.
"""
# Create column description
descr = {}
for i, (cname, cval) in enumerate(cols.items()):
descr[cname] = tb.Col.from_dtype(cval.dtype, pos=i)
f = tb.openFile(fname, 'a') # if doesn't exist create it
table = f.createTable('/', tname, descr, "", tb.Filters(9))
table.append([v for k, v in cols.items()])
table.flush()
print "file with new table:", f
processing_options.locomotion.velocity_tip_diff = 0.5
processing_options.locomotion.velocity_body_diff = 1
#useful to display progress
base_name = skeletons_file.rpartition('.')[0].rpartition(os.sep)[-1]
#initialize by getting the specs data subdivision
wStats = wormStatsClass()
#list to save trajectories mean features
all_stats = []
progress_timer = timeCounterStr('');
#filter used for each fo the tables
filters_tables = tables.Filters(complevel = 5, complib='zlib', shuffle=True)
#create the motion table header
motion_header = {'frame_number':tables.Int32Col(pos=0),\
'skeleton_id':tables.Int32Col(pos=1),\
'motion_modes':tables.Float32Col(pos=2)}
for ii, spec in enumerate(wStats.specs_motion):
feature = wStats.spec2tableName[spec.name]
motion_header[feature] = tables.Float32Col(pos=ii+2)
#get the is_signed flag for motion specs and store it as an attribute
#is_signed flag is used by featureStat in order to subdivide the data if required
is_signed_motion = np.zeros(len(motion_header), np.uint8);
for ii, spec in enumerate(wStats.specs_motion):
feature = wStats.spec2tableName[spec.name]
is_signed_motion[motion_header[feature]._v_pos] = spec.is_signed
rec_dict['contact_id']=1
full_path = os.path.join(dirname, f_spike)
fname = full_path.format(**rec_dict)
sp = np.fromfile(fname, dtype=np.int16)/200.
if memmap=="numpy":
#create temporary memmory mapped array
filename = os.path.join(mkdtemp(), 'newfile.dat')
fp = np.memmap(filename, dtype='float', mode='w+',
shape=(len(sp),n_contacts))
elif memmap=="tables":
atom = tables.Atom.from_dtype(sp.dtype)
shape = (len(sp), n_contacts)
filters = tables.Filters(complevel=3, complib='blosc')
filename = os.path.join(mkdtemp(), 'newfile.dat')
h5f = tables.openFile(filename,'w')
fp = h5f.createCArray('/', "test", atom, shape, filters=filters)
else:
fp = np.empty((len(sp), n_contacts), dtype='float')
fp[:,0]=sp
for i in range(1,n_contacts):
rec_dict['contact_id']=i+1
fname = full_path.format(**rec_dict)
sp = np.fromfile(fname,dtype=np.int16)
fp[:,i]=sp/200.
del sp
return {'data':fp, "FS":conf_dict['FS'], "n_contacts":n_contacts}
except:
# If anything fails, we just create a new database...
log.warning("""Failed to open existing database at %s, or
database is corrupted. Creating a new one""", self.path)
self.results = None
# Something went wrong!
if not self.results:
try:
# Try closing this, just in case
self.h5.close()
except:
pass
# Compression is good -- and faster, according to the pytables docs...
f = tables.Filters(complib='blosc', complevel=5)
self.h5 = tables.open_file(self.path, 'w', filters=f)
self.results = self.h5.create_table(
'/', 'results', cfg.data_layout.data_type)
self.results.cols.subset_id.create_csindex()
assert isinstance(self.results, tables.Table)
assert self.results.indexed
:param str archive_name: filestem for the new archive
:return None:
"""
def store_carray(archive, array, name):
atom = tb.Atom.from_dtype(array.dtype)
store = archive.create_carray(archive.root, name, atom, array.shape)
store[:] = array
store.flush()
if not archive_name.endswith('.h5'):
archive_name += '.h5'
# construct container
blosc5 = tb.Filters(complevel=5, complib='blosc')
f = tb.open_file(archive_name, mode='w', title='Data for seqc.ReadArray',
filters=blosc5)
f.create_table(f.root, 'data', self.data)
if self._ambiguous_genes:
# each array is data, indices, indptr
store_carray(f, self.genes.indices, 'indices')
store_carray(f, self.genes.indptr, 'indptr')
store_carray(f, self.genes.data, 'gene_data')
store_carray(f, self.positions.data, 'positions_data')
else:
store_carray(f, self.genes, 'genes')
store_carray(f, self.positions, 'positions')
f.close()
saveSplitIndices(value, title, group, tables_file)
elif title == "feature mapping":
saveFeatureMapping(value, title, group, tables_file)
elif value is None:
saveString(str(value), title, group, tables_file)
elif title.endswith("set"):
save(value, tables_file, group_title = title)
else:
raise NotImplementedError(
"Saving type {} for title \"{}\" has not been implemented."
.format(type(value), title)
)
start_time = time()
filters = tables.Filters(complib = "zlib", complevel = 5)
with tables.open_file(path, "w", filters = filters) as tables_file:
save(data_dictionary, tables_file)
duration = time() - start_time
print("Data saved ({}).".format(formatDuration(duration)))
import tables
#---------------------------------------------------------------------------
# xpaxs imports
#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
# Normal code begins
#---------------------------------------------------------------------------
logger = logging.getLogger('XPaXS.io.hdf5file')
DEBUG = False
filters = tables.Filters(complib='zlib', complevel=9)
class XpaxsH5File(QtCore.QObject):
def __init__(self, filename, mode='r+', parent=None):
super(XpaxsH5File, self).__init__(parent)
self.__mutex = QtCore.QMutex(QtCore.QMutex.Recursive)
try:
self.__h5file = tables.openFile(filename, mode)
except IOError, err:
if mode == 'r+': self.__h5file = tables.openFile(filename, 'w')
else: raise err
mutex = property(lambda self: self.__mutex)