Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
fname = file_name.split('.')[0]
if load_summed_data and load_raw_data:
try:
# data from channel summed
# exp_data = np.array(data['detsum/counts'][:, :, 0:spectrum_cut],
# dtype=np.float32)
# exp_data = np.array(data['detsum/counts'], dtype=np.float32)
data_shape = data['detsum/counts'].shape
exp_data = RawHDF5Dataset(file_path, 'xrfmap/detsum/counts',
shape=data_shape)
logger.warning(f"We use spectrum range from 0 to {spectrum_cut}")
logger.info(f"Exp. data from h5 has shape of: {data_shape}")
fname_sum = f"{fname}_sum"
DS = DataSelection(filename=fname_sum,
raw_data=exp_data)
data_sets[fname_sum] = DS
logger.info('Data of detector sum is loaded.')
except KeyError:
print('No data is loaded for detector sum.')
if 'scalers' in data: # Scalers are always loaded if data is available
det_name = data['scalers/name']
temp = {}
for i, n in enumerate(det_name):
if not isinstance(n, six.string_types):
n = n.decode()
temp[n] = data['scalers/val'].value[:, :, i]
img_dict[f"{fname}_scaler"] = temp
# also dump other data from suitcase if required
logger.info('No fitting from MAPS can be loaded.')
try:
fit_data = f['xrfmap/detsum']
fit_v_pyxrf = fit_data['xrf_fit'][:]
fit_n_pyxrf = fit_data['xrf_fit_name'].value
print(fit_n_pyxrf)
except KeyError:
logger.info('No fitting from pyxrf can be loaded.')
# exp_shape = exp_data.shape
exp_data = exp_data.T
exp_data = np.rot90(exp_data, 1)
logger.info('File : {} with total counts {}'.format(fname,
np.sum(exp_data)))
DS = DataSelection(filename=fname,
raw_data=exp_data)
data_sets.update({fname: DS})
# save roi and fit into dict
temp_roi = {}
temp_fit = {}
temp_scaler = {}
temp_pos = {}
for i, name in enumerate(roi_channel):
temp_roi[name] = np.flipud(roi_val[i, :, :])
img_dict[fname+'_roi'] = temp_roi
if fit_val is not None:
for i, name in enumerate(roi_channel):
# The file is always expected to have 'detsum' dataset
dset_name = f"{interpath}/detsum/{dset}"
with h5py.File(fname, "r") as f:
dset_shape = f[dset_name].shape
data_out["det_sum"] = RawHDF5Dataset(fname, dset_name, dset_shape)
# Now fill 'data_sets' dictionary
DS = DataSelection(filename=fname_sum,
raw_data=data_out["det_sum"])
data_sets[fname_sum] = DS
logger.info("Data loading: channel sum is loaded successfully.")
for det_name in xrf_det_list:
exp_data = data_out[det_name]
fln = f"{fname_no_ext}_{det_name}"
DS = DataSelection(filename=fln,
raw_data=exp_data)
data_sets[fln] = DS
logger.info("Data loading: channel data is loaded successfully.")
if ('pos_data' in data_out) and ('pos_names' in data_out):
if 'x_pos' in data_out['pos_names'] and 'y_pos' in data_out['pos_names']:
p_dict = {}
for v in ['x_pos', 'y_pos']:
ind = data_out['pos_names'].index(v)
p_dict[v] = data_out['pos_data'][ind, :, :]
img_dict['positions'] = p_dict
logger.info("Data loading: positions data are loaded successfully.")
scaler_tmp = {}
for i, v in enumerate(data_out['scaler_names']):
scaler_tmp[v] = data_out['scaler_data'][:, :, i]
# Data from individual detectors may or may not be present in the file
for det_name in xrf_det_list:
dset_name = f"{interpath}/{det_name}/{dset}"
with h5py.File(fname, "r") as f:
dset_shape = f[dset_name].shape
data_out[det_name] = RawHDF5Dataset(fname, dset_name, dset_shape)
# The file is always expected to have 'detsum' dataset
dset_name = f"{interpath}/detsum/{dset}"
with h5py.File(fname, "r") as f:
dset_shape = f[dset_name].shape
data_out["det_sum"] = RawHDF5Dataset(fname, dset_name, dset_shape)
# Now fill 'data_sets' dictionary
DS = DataSelection(filename=fname_sum,
raw_data=data_out["det_sum"])
data_sets[fname_sum] = DS
logger.info("Data loading: channel sum is loaded successfully.")
for det_name in xrf_det_list:
exp_data = data_out[det_name]
fln = f"{fname_no_ext}_{det_name}"
DS = DataSelection(filename=fln,
raw_data=exp_data)
data_sets[fln] = DS
logger.info("Data loading: channel data is loaded successfully.")
if ('pos_data' in data_out) and ('pos_names' in data_out):
if 'x_pos' in data_out['pos_names'] and 'y_pos' in data_out['pos_names']:
p_dict = {}
for v in ['x_pos', 'y_pos']:
channel_num = channel_num-1 # do not consider det_sum
# data from each channel
if load_each_channel and load_raw_data:
for i in range(1, channel_num+1):
det_name = f"det{i}"
file_channel = f"{fname}_det{i}"
try:
# exp_data_new = np.array(data[f"{det_name}/counts"][:, :, 0:spectrum_cut],
# dtype=np.float32)
data_shape = data[f"{det_name}/counts"].shape
exp_data_new = RawHDF5Dataset(file_path, f"xrfmap/{det_name}/counts",
shape=data_shape)
DS = DataSelection(filename=file_channel,
raw_data=exp_data_new)
data_sets[file_channel] = DS
logger.info(f"Data from detector channel {i} is loaded.")
except KeyError:
print(f"No data is loaded for {det_name}.")
if load_processed_each_channel:
for i in range(1, channel_num + 1):
det_name = f"det{i}"
file_channel = f"{fname}_det{i}"
if 'xrf_fit' in data[det_name] and load_fit_results:
try:
fit_result = get_fit_data(data[det_name]['xrf_fit_name'].value,
data[det_name]['xrf_fit'].value)
img_dict.update({f"{file_channel}_fit": fit_result})
# also include scaler data