Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
header, header_comments = get_header(hdulist, hduname)
# Closes the FITS file, further file data reads will be done via Stingray
hdulist.close()
# Prepares additional_columns
additional_columns = []
for i in range(len(columns)):
if columns[i] != column:
if len(extra_colums) == 0 or columns[i] in extra_colums:
additional_columns.append(columns[i])
# Reads fits data
logging.debug("Reading Events Fits columns's data")
fits_data = load_events_and_gtis(destination,
additional_columns=additional_columns,
gtistring=gtistring,
hduname=hduname, column=column)
event_list, events_start_time = substract_tstart_from_events(fits_data, time_offset)
# Gets PI column data from eventlist if requiered and PHA not in additional_data
if "PI" in additional_columns \
and "PI" not in fits_data.additional_data \
and "PHA" not in fits_data.additional_data:
fits_data.additional_data["PI"] = event_list.pi
dataset = DataSet.get_dataset_applying_gtis(dsId, header, header_comments,
fits_data.additional_data, [],
event_list.time, [],
event_list.gti[:, 0], event_list.gti[:, 1],
if not destination:
return None
filename = os.path.splitext(destination)[0]
file_extension = magic.from_file(destination)
logging.debug("File extension: %s" % file_extension)
if file_extension.find("FITS") == 0:
# Opening Fits
hdulist = fits.open(destination, memmap=True)
if 'EVENTS' in hdulist:
# If EVENTS extension found, consider the Fits as EVENTS Fits
fits_data = load_events_and_gtis(destination,
additional_columns=['PI', "PHA"],
gtistring=CONFIG.GTI_STRING,
hduname='EVENTS', column=CONFIG.TIME_COLUMN)
return substract_tstart_from_events(fits_data, time_offset)
elif 'RATE' in hdulist:
# If RATE extension found, consider the Fits as LIGHTCURVE Fits
# Reads the lightcurve with hendrics
outfile = lcurve_from_fits(destination, gtistring=get_hdu_string_from_hdulist(CONFIG.GTI_STRING, hdulist),
timecolumn=CONFIG.TIME_COLUMN, ratecolumn=None, ratehdu=1,
fracexp_limit=CONFIG.FRACEXP_LIMIT)[0]
return substract_tstart_from_lcurve(load_lcurve(outfile), time_offset)
else:
logging.error("Unsupported FITS type!")
logging.warn("RATE, RATE1 or COUNTS ambiguous columns found in " + str(hduname) + " HDU, found columns: " + str(hdulist[hduname].data.names))
return None
ratecolumn = list(intersection_columns)[0]
if len(hdulist[hduname].data[ratecolumn].shape) != 1 \
or not (isinstance(hdulist[hduname].data[ratecolumn][0], int) \
or isinstance(hdulist[hduname].data[ratecolumn][0], np.integer) \
or isinstance(hdulist[hduname].data[ratecolumn][0], float) \
or isinstance(hdulist[hduname].data[ratecolumn][0], np.floating)):
logging.warn("Wrong data type found for column: " + str(ratecolumn) + " in " + str(hduname) + " HDU, expected Integer or Float.")
return None
header, header_comments = get_header(hdulist, hduname)
# Reads the lightcurve with HENDRICS
outfile = lcurve_from_fits(destination, gtistring=get_hdu_string_from_hdulist(gtistring, hdulist),
timecolumn=column, ratecolumn=ratecolumn, ratehdu=1,
fracexp_limit=CONFIG.FRACEXP_LIMIT)[0]
lcurve, events_start_time = substract_tstart_from_lcurve(load_data(outfile), time_offset)
dataset = DataSet.get_lightcurve_dataset_from_stingray_lcurve(lcurve, header, header_comments,
hduname, column)
# Stores the events_start_time in time column extra
dataset.tables[hduname].columns[column].set_extra("TSTART", events_start_time)
logging.debug("Read Lightcurve fits with stingray file successfully: " + str(destination) + ", tstart: " + str(events_start_time) + ", rate: " + str(len(lcurve["counts"])))
return dataset
# Opening Fits
hdulist = fits.open(destination, memmap=True)
if 'EVENTS' in hdulist:
# If EVENTS extension found, consider the Fits as EVENTS Fits
fits_data = load_events_and_gtis(destination,
additional_columns=['PI', "PHA"],
gtistring=CONFIG.GTI_STRING,
hduname='EVENTS', column=CONFIG.TIME_COLUMN)
return substract_tstart_from_events(fits_data, time_offset)
elif 'RATE' in hdulist:
# If RATE extension found, consider the Fits as LIGHTCURVE Fits
# Reads the lightcurve with hendrics
outfile = lcurve_from_fits(destination, gtistring=get_hdu_string_from_hdulist(CONFIG.GTI_STRING, hdulist),
timecolumn=CONFIG.TIME_COLUMN, ratecolumn=None, ratehdu=1,
fracexp_limit=CONFIG.FRACEXP_LIMIT)[0]
return substract_tstart_from_lcurve(load_lcurve(outfile), time_offset)
else:
logging.error("Unsupported FITS type!")
else:
logging.error("Unknown file extension: %s" % file_extension)
return None
def load_dataset_from_intermediate_file(fname):
"""Save Stingray object to intermediate file."""
from stingray.lightcurve import Lightcurve
from stingray.events import EventList
from stingray.crossspectrum import Crossspectrum
from hendrics.io import get_file_type
from stingray.io import _retrieve_pickle_object
# This will return an EventList, a light curve, a Powerspectrum, ...
# depending on the contents of the file
try:
ftype, contents = get_file_type(fname)
except:
contents = _retrieve_pickle_object(fname)
if isinstance(contents, Lightcurve):
return DataSet.get_lightcurve_dataset_from_stingray_Lightcurve(contents)
elif isinstance(contents, EventList):
return DataSet.get_eventlist_dataset_from_stingray_Eventlist(contents)
# This also work for Powerspectrum and AveragedCrosspowerspectrum, clearly
elif isinstance(contents, Crossspectrum):
logging.error("Unsupported intermediate file type: Crossspectrum")
else:
logging.error("Unsupported intermediate file type: %s" % type(stingray_object).__name__)
if len(hdulist[hduname].data[ratecolumn].shape) != 1 \
or not (isinstance(hdulist[hduname].data[ratecolumn][0], int) \
or isinstance(hdulist[hduname].data[ratecolumn][0], np.integer) \
or isinstance(hdulist[hduname].data[ratecolumn][0], float) \
or isinstance(hdulist[hduname].data[ratecolumn][0], np.floating)):
logging.warn("Wrong data type found for column: " + str(ratecolumn) + " in " + str(hduname) + " HDU, expected Integer or Float.")
return None
header, header_comments = get_header(hdulist, hduname)
# Reads the lightcurve with HENDRICS
outfile = lcurve_from_fits(destination, gtistring=get_hdu_string_from_hdulist(gtistring, hdulist),
timecolumn=column, ratecolumn=ratecolumn, ratehdu=1,
fracexp_limit=CONFIG.FRACEXP_LIMIT)[0]
lcurve, events_start_time = substract_tstart_from_lcurve(load_data(outfile), time_offset)
dataset = DataSet.get_lightcurve_dataset_from_stingray_lcurve(lcurve, header, header_comments,
hduname, column)
# Stores the events_start_time in time column extra
dataset.tables[hduname].columns[column].set_extra("TSTART", events_start_time)
logging.debug("Read Lightcurve fits with stingray file successfully: " + str(destination) + ", tstart: " + str(events_start_time) + ", rate: " + str(len(lcurve["counts"])))
return dataset
def get_intermediate_file(filepath, target):
try:
stingray_object = DaveReader.get_stingray_object(filepath)
if stingray_object:
filename = FileUtils.get_intermediate_filename(target, filepath, HEN_FILE_EXTENSION)
if DaveReader.save_to_intermediate_file(stingray_object, filename):
return filename
except:
logging.error(ExHelper.getException('get_intermediate_file'))
return None
def save_to_intermediate_file(stingray_object, fname):
"""Save Stingray object to intermediate file."""
from stingray.lightcurve import Lightcurve
from stingray.events import EventList
from stingray.crossspectrum import Crossspectrum
from hendrics.io import save_lcurve, save_events, save_pds
if isinstance(stingray_object, Lightcurve):
save_lcurve(stingray_object, fname)
elif isinstance(stingray_object, EventList):
save_events(stingray_object, fname)
# This also work for Powerspectrum and AveragedCrosspowerspectrum, clearly
elif isinstance(stingray_object, Crossspectrum):
save_pds(stingray_object, fname)
else:
logging.error("save_to_intermediate_file: Unknown object type: %s" % type(stingray_object).__name__)
return False
return True
def save_to_intermediate_file(stingray_object, fname):
"""Save Stingray object to intermediate file."""
from stingray.lightcurve import Lightcurve
from stingray.events import EventList
from stingray.crossspectrum import Crossspectrum
from hendrics.io import save_lcurve, save_events, save_pds
if isinstance(stingray_object, Lightcurve):
save_lcurve(stingray_object, fname)
elif isinstance(stingray_object, EventList):
save_events(stingray_object, fname)
# This also work for Powerspectrum and AveragedCrosspowerspectrum, clearly
elif isinstance(stingray_object, Crossspectrum):
save_pds(stingray_object, fname)
else:
logging.error("save_to_intermediate_file: Unknown object type: %s" % type(stingray_object).__name__)
return False
return True
def save_to_intermediate_file(stingray_object, fname):
"""Save Stingray object to intermediate file."""
from stingray.lightcurve import Lightcurve
from stingray.events import EventList
from stingray.crossspectrum import Crossspectrum
from hendrics.io import save_lcurve, save_events, save_pds
if isinstance(stingray_object, Lightcurve):
save_lcurve(stingray_object, fname)
elif isinstance(stingray_object, EventList):
save_events(stingray_object, fname)
# This also work for Powerspectrum and AveragedCrosspowerspectrum, clearly
elif isinstance(stingray_object, Crossspectrum):
save_pds(stingray_object, fname)
else:
logging.error("save_to_intermediate_file: Unknown object type: %s" % type(stingray_object).__name__)
return False
return True