Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def read_MAPS(working_directory,
file_name, channel_num=1):
# data_dict = OrderedDict()
data_sets = OrderedDict()
img_dict = OrderedDict()
# Empty container for metadata
mdata = ScanMetadataXRF()
# cut off bad point on the last position of the spectrum
# bad_point_cut = 0
fit_val = None
fit_v_pyxrf = None
file_path = os.path.join(working_directory, file_name)
print('file path is {}'.format(file_path))
with h5py.File(file_path, 'r+') as f:
data = f['MAPS']
fname = file_name.split('.')[0]
# for 2D MAP
# Convert ndarrays to lists (they were lists before they were saved)
if isinstance(value, np.ndarray):
value = list(value)
mdata[key] = value
data = f['xrfmap']
fname = file_name.split('.')[0]
if load_summed_data and load_raw_data:
try:
# data from channel summed
# exp_data = np.array(data['detsum/counts'][:, :, 0:spectrum_cut],
# dtype=np.float32)
# exp_data = np.array(data['detsum/counts'], dtype=np.float32)
data_shape = data['detsum/counts'].shape
exp_data = RawHDF5Dataset(file_path, 'xrfmap/detsum/counts',
shape=data_shape)
logger.warning(f"We use spectrum range from 0 to {spectrum_cut}")
logger.info(f"Exp. data from h5 has shape of: {data_shape}")
fname_sum = f"{fname}_sum"
DS = DataSelection(filename=fname_sum,
raw_data=exp_data)
data_sets[fname_sum] = DS
logger.info('Data of detector sum is loaded.')
except KeyError:
print('No data is loaded for detector sum.')
if 'scalers' in data: # Scalers are always loaded if data is available
det_name = data['scalers/name']
temp = {}
fname = file_name.split('.')[0]
if load_summed_data and load_raw_data:
try:
# data from channel summed
# exp_data = np.array(data['detsum/counts'][:, :, 0:spectrum_cut],
# dtype=np.float32)
# exp_data = np.array(data['detsum/counts'], dtype=np.float32)
data_shape = data['detsum/counts'].shape
exp_data = RawHDF5Dataset(file_path, 'xrfmap/detsum/counts',
shape=data_shape)
logger.warning(f"We use spectrum range from 0 to {spectrum_cut}")
logger.info(f"Exp. data from h5 has shape of: {data_shape}")
fname_sum = f"{fname}_sum"
DS = DataSelection(filename=fname_sum,
raw_data=exp_data)
data_sets[fname_sum] = DS
logger.info('Data of detector sum is loaded.')
except KeyError:
print('No data is loaded for detector sum.')
if 'scalers' in data: # Scalers are always loaded if data is available
det_name = data['scalers/name']
temp = {}
for i, n in enumerate(det_name):
if not isinstance(n, six.string_types):
n = n.decode()
temp[n] = data['scalers/val'].value[:, :, i]
img_dict[f"{fname}_scaler"] = temp
# also dump other data from suitcase if required
def create_EC_list(self, element_list):
temp_dict = OrderedDict()
for e in element_list:
if e == "":
pass
elif '-' in e: # pileup peaks
e1, e2 = e.split('-')
energy = float(get_energy(e1))+float(get_energy(e2))
ps = PreFitStatus(z=get_Z(e),
energy=str(energy), norm=1)
temp_dict[e] = ps
else:
ename = e.split('_')[0]
ps = PreFitStatus(z=get_Z(ename),
energy=get_energy(e),
norm=1)
temp_dict[e] = ps
self.EC.add_to_dict(temp_dict)
# add escape peak
if param_dict['non_fitting_values']['escape_ratio'] > 0:
pre_dict['escape'] = trim_escape_peak(self.data,
param_dict, len(self.y0))
temp_dict = OrderedDict()
for e in six.iterkeys(pre_dict):
if e in ['background', 'escape']:
spectrum = pre_dict[e]
# summed spectrum here is not correct,
# as the interval is assumed as 1, not energy interval
# however area of background and escape is not used elsewhere, not important
area = np.sum(spectrum)
ps = PreFitStatus(z=get_Z(e), energy=get_energy(e),
area=float(area), spectrum=spectrum,
maxv=float(np.around(np.max(spectrum), self.max_area_dig)),
norm=-1, lbd_stat=False)
temp_dict[e] = ps
elif '-' in e: # pileup peaks
e1, e2 = e.split('-')
energy = float(get_energy(e1))+float(get_energy(e2))
spectrum = pre_dict[e]
area = area_dict[e]
ps = PreFitStatus(z=get_Z(e), energy=str(energy),
area=area, spectrum=spectrum,
maxv=np.around(np.max(spectrum), self.max_area_dig),
norm=-1, lbd_stat=False)
temp_dict[e] = ps
# however area of background and escape is not used elsewhere, not important
area = np.sum(spectrum)
ps = PreFitStatus(z=get_Z(e), energy=get_energy(e),
area=float(area), spectrum=spectrum,
maxv=float(np.around(np.max(spectrum), self.max_area_dig)),
norm=-1, lbd_stat=False)
temp_dict[e] = ps
elif '-' in e: # pileup peaks
e1, e2 = e.split('-')
energy = float(get_energy(e1))+float(get_energy(e2))
spectrum = pre_dict[e]
area = area_dict[e]
ps = PreFitStatus(z=get_Z(e), energy=str(energy),
area=area, spectrum=spectrum,
maxv=np.around(np.max(spectrum), self.max_area_dig),
norm=-1, lbd_stat=False)
temp_dict[e] = ps
else:
ename = e.split('_')[0]
for k, v in six.iteritems(param_dict):
if ename in k and 'area' in k:
spectrum = pre_dict[e]
area = area_dict[e]
elif ename == 'compton' and k == 'compton_amplitude':
spectrum = pre_dict[e]
area = area_dict[e]
if os.path.isabs(fln):
f = fln
else:
f = os.path.join(working_directory, fln)
try:
param_quant_analysis.load_entry(f)
quant_norm = True
logger.info(f"Quantitative calibration is loaded successfully from file '{f}'")
except Exception as ex:
logger.error(f"Error occurred while loading quantitative calibration from file '{f}': {ex}")
t0 = time.time()
prefix_fname = file_name.split('.')[0]
if fit_channel_sum is True:
if data_from == 'NSLS-II':
img_dict, data_sets, mdata = read_hdf_APS(working_directory, file_name,
spectrum_cut=spectrum_cut,
load_each_channel=False)
elif data_from == '2IDE-APS':
img_dict, data_sets, mdata = read_MAPS(working_directory,
file_name, channel_num=1)
else:
print('Unknown data sets.')
try:
data_all_sum = data_sets[prefix_fname+'_sum'].raw_data
except KeyError:
data_all_sum = data_sets[prefix_fname].raw_data
# load param file
if not os.path.isabs(param_file_name):
param_path = os.path.join(working_directory, param_file_name)
temp_dict = OrderedDict()
for e in element_list:
if e == "":
pass
elif '-' in e: # pileup peaks
e1, e2 = e.split('-')
energy = float(get_energy(e1))+float(get_energy(e2))
ps = PreFitStatus(z=get_Z(e),
energy=str(energy), norm=1)
temp_dict[e] = ps
else:
ename = e.split('_')[0]
ps = PreFitStatus(z=get_Z(ename),
energy=get_energy(e),
norm=1)
temp_dict[e] = ps
self.EC.add_to_dict(temp_dict)
def create_EC_list(self, element_list):
temp_dict = OrderedDict()
for e in element_list:
if e == "":
pass
elif '-' in e: # pileup peaks
e1, e2 = e.split('-')
energy = float(get_energy(e1))+float(get_energy(e2))
ps = PreFitStatus(z=get_Z(e),
energy=str(energy), norm=1)
temp_dict[e] = ps
else:
ename = e.split('_')[0]
ps = PreFitStatus(z=get_Z(ename),
energy=get_energy(e),
norm=1)
temp_dict[e] = ps
self.EC.add_to_dict(temp_dict)
if path[-1] == 'time':
if key.endswith("_utc"):
value = convert_time_to_nexus_string(ttime.gmtime(value))
else:
value = convert_time_to_nexus_string(ttime.localtime(value))
mdata[key] = value
break
stop_document = hdr.stop
if stop_document:
if "time" in stop_document:
t = stop_document["time"]
mdata["scan_time_stop"] = convert_time_to_nexus_string(ttime.localtime(t))
mdata["scan_time_stop_utc"] = convert_time_to_nexus_string(ttime.gmtime(t))
if "exit_status" in stop_document:
mdata["scan_exit_status"] = stop_document["exit_status"]
else:
mdata["scan_exit_status"] = "incomplete"
# Add full beamline name (if available, otherwise don't create the entry).
# Also, don't overwrite the existing name if it was read from the start document
if "scan_instrument_id" in mdata and "scan_instrument_name" not in mdata:
instruments = {
"srx": "Submicron Resolution X-ray Spectroscopy",
"hxn": "Hard X-ray Nanoprobe",
"tes": "Tender Energy X-ray Absorption Spectroscopy",
"xfm": "X-ray Fluorescence Microprobe"