Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Convert ndarrays to lists (they were lists before they were saved)
if isinstance(value, np.ndarray):
value = list(value)
mdata[key] = value
data = f['xrfmap']
fname = file_name.split('.')[0]
if load_summed_data and load_raw_data:
try:
# data from channel summed
# exp_data = np.array(data['detsum/counts'][:, :, 0:spectrum_cut],
# dtype=np.float32)
# exp_data = np.array(data['detsum/counts'], dtype=np.float32)
data_shape = data['detsum/counts'].shape
exp_data = RawHDF5Dataset(file_path, 'xrfmap/detsum/counts',
shape=data_shape)
logger.warning(f"We use spectrum range from 0 to {spectrum_cut}")
logger.info(f"Exp. data from h5 has shape of: {data_shape}")
fname_sum = f"{fname}_sum"
DS = DataSelection(filename=fname_sum,
raw_data=exp_data)
data_sets[fname_sum] = DS
logger.info('Data of detector sum is loaded.')
except KeyError:
print('No data is loaded for detector sum.')
if 'scalers' in data: # Scalers are always loaded if data is available
det_name = data['scalers/name']
temp = {}
# Determine the number of available detector channels and create the list
# of channel names. The channels are named as 'det1', 'det2', 'det3' etc.
xrf_det_list = [nm for nm in data_out.keys() if 'det' in nm and 'sum' not in nm]
# Replace the references to raw data by the references to HDF5 datasets.
# This should also release memory used for storage of raw data
# It is expected that 'data_out' has keys 'det_sum', 'det1', 'det2', etc.
interpath = "xrfmap"
dset = "counts"
# Data from individual detectors may or may not be present in the file
for det_name in xrf_det_list:
dset_name = f"{interpath}/{det_name}/{dset}"
with h5py.File(fname, "r") as f:
dset_shape = f[dset_name].shape
data_out[det_name] = RawHDF5Dataset(fname, dset_name, dset_shape)
# The file is always expected to have 'detsum' dataset
dset_name = f"{interpath}/detsum/{dset}"
with h5py.File(fname, "r") as f:
dset_shape = f[dset_name].shape
data_out["det_sum"] = RawHDF5Dataset(fname, dset_name, dset_shape)
# Now fill 'data_sets' dictionary
DS = DataSelection(filename=fname_sum,
raw_data=data_out["det_sum"])
data_sets[fname_sum] = DS
logger.info("Data loading: channel sum is loaded successfully.")
for det_name in xrf_det_list:
exp_data = data_out[det_name]
fln = f"{fname_no_ext}_{det_name}"
for v in list(data.keys()):
if 'det' in v:
channel_num = channel_num+1
channel_num = channel_num-1 # do not consider det_sum
# data from each channel
if load_each_channel and load_raw_data:
for i in range(1, channel_num+1):
det_name = f"det{i}"
file_channel = f"{fname}_det{i}"
try:
# exp_data_new = np.array(data[f"{det_name}/counts"][:, :, 0:spectrum_cut],
# dtype=np.float32)
data_shape = data[f"{det_name}/counts"].shape
exp_data_new = RawHDF5Dataset(file_path, f"xrfmap/{det_name}/counts",
shape=data_shape)
DS = DataSelection(filename=file_channel,
raw_data=exp_data_new)
data_sets[file_channel] = DS
logger.info(f"Data from detector channel {i} is loaded.")
except KeyError:
print(f"No data is loaded for {det_name}.")
if load_processed_each_channel:
for i in range(1, channel_num + 1):
det_name = f"det{i}"
file_channel = f"{fname}_det{i}"
if 'xrf_fit' in data[det_name] and load_fit_results:
try:
fit_result = get_fit_data(data[det_name]['xrf_fit_name'].value,
# It is expected that 'data_out' has keys 'det_sum', 'det1', 'det2', etc.
interpath = "xrfmap"
dset = "counts"
# Data from individual detectors may or may not be present in the file
for det_name in xrf_det_list:
dset_name = f"{interpath}/{det_name}/{dset}"
with h5py.File(fname, "r") as f:
dset_shape = f[dset_name].shape
data_out[det_name] = RawHDF5Dataset(fname, dset_name, dset_shape)
# The file is always expected to have 'detsum' dataset
dset_name = f"{interpath}/detsum/{dset}"
with h5py.File(fname, "r") as f:
dset_shape = f[dset_name].shape
data_out["det_sum"] = RawHDF5Dataset(fname, dset_name, dset_shape)
# Now fill 'data_sets' dictionary
DS = DataSelection(filename=fname_sum,
raw_data=data_out["det_sum"])
data_sets[fname_sum] = DS
logger.info("Data loading: channel sum is loaded successfully.")
for det_name in xrf_det_list:
exp_data = data_out[det_name]
fln = f"{fname_no_ext}_{det_name}"
DS = DataSelection(filename=fln,
raw_data=exp_data)
data_sets[fln] = DS
logger.info("Data loading: channel data is loaded successfully.")
if ('pos_data' in data_out) and ('pos_names' in data_out):