Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
date_obs = datetime.now()
comments = data.meta.get('comments', [])
for comment in comments:
if 'date-obs' in comment.lower():
date_obs = comment.split(':')[1].strip()
if 'facility' in comment.lower():
facility_name = comment.split(':')[1].strip()
facility = get_service_class(facility_name)() if facility_name else None
wavelength_units = facility.get_wavelength_units() if facility else self.DEFAULT_WAVELENGTH_UNITS
flux_constant = facility.get_flux_constant() if facility else self.DEFAULT_FLUX_CONSTANT
spectral_axis = np.array(data['wavelength']) * wavelength_units
flux = np.array(data['flux']) * flux_constant
spectrum = Spectrum1D(flux=flux, spectral_axis=spectral_axis)
return spectrum, Time(date_obs).to_datetime()
header = hdulist[0].header
meta = {'header': header}
wcs = WCS(hdulist[1].header)
data = hdulist[1].data # spectrum in the first extension
unit = def_unit('arbitrary units')
uncertainty = StdDevUncertainty(hdulist[2].data)
# dispersion from the WCS but convert out of logspace
dispersion = 10**wcs.all_pix2world(np.arange(data.shape[0]), 0)[0]
dispersion_unit = Unit('Angstrom')
hdulist.close()
return Spectrum1D(data=data * unit,
uncertainty=uncertainty,
spectral_axis=dispersion * dispersion_unit,
meta=meta,
wcs=wcs)
Raises
--------
NotImplementedError
If the format can't be read currently
"""
if dispersion_unit:
dispersion_unit = u.Unit(dispersion_unit)
data = fits.getdata(filename)
header = fits.getheader(filename)
wcs_info = FITSWCSSpectrum(header)
if wcs_info.naxis == 1:
wcs = read_fits_wcs_linear1d(wcs_info, dispersion_unit=dispersion_unit)
return Spectrum1D(data, wcs=wcs, unit=flux_unit)
elif wcs_info.naxis == 2 and \
wcs_info.affine_transform_dict['ctype'] == ["MULTISPE", "MULTISPE"]:
multi_wcs = multispec_wcs_reader(wcs_info, dispersion_unit=dispersion_unit)
multispec = []
for spectrum_data, spectrum_wcs in zip(data, multi_wcs.values()):
multispec.append(
Spectrum1D(spectrum_data, wcs=spectrum_wcs, unit=flux_unit))
return multispec
elif wcs_info.naxis == 3 and \
wcs_info.affine_transform_dict['ctype'] == ["LINEAR","LINEAR","LINEAR"]:
wcs = read_fits_wcs_linear1d(wcs_info, dispersion_unit=dispersion_unit)
equispec = []
for i in range(data.shape[0]):
equispec.append(
Spectrum1D(data[i][0], wcs=wcs, unit=flux_unit))
header = hdu[0].header
name = header.get('FILENAME')
meta = {'header': header}
unit = Unit("erg/cm**2 Angstrom s")
disp_unit = Unit('Angstrom')
data = hdu[1].data['FLUX'].flatten() * unit
dispersion = hdu[1].data['wavelength'].flatten() * disp_unit
uncertainty = StdDevUncertainty(hdu[1].data["ERROR"].flatten() * unit)
sort_idx = dispersion.argsort()
dispersion = dispersion[sort_idx]
data = data[sort_idx]
uncertainty = uncertainty[sort_idx]
return Spectrum1D(flux=data,
spectral_axis=dispersion,
uncertainty=uncertainty,
meta=meta)
pos : `~astropy.units.Quantity`
A tuple `~astropy.units.Quantity` with
(min, max) range of roi.
Returns
-------
None or `~specutils.utils.SpectralRegion`
"""
if not isinstance(pos, u.Quantity):
return None
elif pos.unit == u.Unit("") or \
pos[0] == pos[1]:
return None
elif pos[0] > pos[1]:
pos = [pos[1], pos[0]] * pos.unit
return SpectralRegion(*pos)
pos : `~astropy.units.Quantity`
A tuple `~astropy.units.Quantity` with
(min, max) range of roi.
Returns
-------
None or `~specutils.utils.SpectralRegion`
"""
if not isinstance(pos, u.Quantity):
return None
elif pos.unit == u.Unit("") or \
pos[0] == pos[1]:
return None
elif pos[0] > pos[1]:
pos = [pos[1], pos[0]] * pos.unit
return SpectralRegion(*pos)
def wrapper(data, spectral_axis, *args, **kwargs):
spec = Spectrum1D(flux=u.Quantity(data),
spectral_axis=spectral_axis)
return func(spec, *args, **kwargs).flux.value
return wrapper
elif wcs_info.naxis == 2 and \
wcs_info.affine_transform_dict['ctype'] == ["MULTISPE", "MULTISPE"]:
multi_wcs = multispec_wcs_reader(wcs_info, dispersion_unit=dispersion_unit)
multispec = []
for spectrum_data, spectrum_wcs in zip(data, multi_wcs.values()):
multispec.append(
Spectrum1D(spectrum_data, wcs=spectrum_wcs, unit=flux_unit))
return multispec
elif wcs_info.naxis == 3 and \
wcs_info.affine_transform_dict['ctype'] == ["LINEAR","LINEAR","LINEAR"]:
wcs = read_fits_wcs_linear1d(wcs_info, dispersion_unit=dispersion_unit)
equispec = []
for i in range(data.shape[0]):
equispec.append(
Spectrum1D(data[i][0], wcs=wcs, unit=flux_unit))
return equispec
elif wcs_info.naxis == 3 and \
wcs_info.affine_transform_dict['ctype'] == ["MULTISPE", "MULTISPE","LINEAR"]:
multi_wcs = multispec_wcs_reader(wcs_info, dispersion_unit=dispersion_unit)
multispec = []
for j in range(data.shape[1]):
equispec = []
for i in range(data.shape[0]):
equispec.append(
Spectrum1D(data[i][j], wcs=list(multi_wcs.values())[j], unit=flux_unit))
multispec.append(equispec)
return multispec
else:
raise NotImplementedError("Either the FITS file does not represent a 1D"
A ``region`` defined as ``SpectralRegion(0.2*u.um, 0.5*u.um)``
And we calculate ``sub_spectrum = extract_region(spectrum, region)``, then the ``sub_spectrum``
spectral axis will be ``[0.2, 0.3, 0.4, 0.5] * u.um``.
If the ``region`` does not overlap with the ``spectrum`` then an empty Spectrum1D object
will be returned.
"""
extracted_spectrum = []
for subregion in region._subregions:
left_index, right_index = _to_edge_pixel(subregion, spectrum)
# If both indices are out of bounds then return None
if left_index is None and right_index is None:
empty_spectrum = Spectrum1D(spectral_axis=[]*spectrum.spectral_axis.unit,
flux=[]*spectrum.flux.unit)
extracted_spectrum.append(empty_spectrum)
else:
# If only one index is out of bounds then set it to
# the lower or upper extent
if left_index is None:
left_index = 0
if right_index is None:
right_index = len(spectrum.spectral_axis)
if left_index > right_index:
left_index, right_index = right_index, left_index
extracted_spectrum.append(spectrum[left_index:right_index])
resampled_spectrum = resampler(spectrum, wave_array)
resampled_template = resampler(template, wave_array)
# Resampler leaves Nans on flux bins that aren't touched by it.
# We replace with zeros. This has the net effect of zero-padding
# the spectrum and/or template so they exactly match each other,
# wavelengthwise.
clean_spectrum_flux = np.nan_to_num(resampled_spectrum.flux.value) * resampled_spectrum.flux.unit
clean_template_flux = np.nan_to_num(resampled_template.flux.value) * resampled_template.flux.unit
clean_spectrum = Spectrum1D(spectral_axis=resampled_spectrum.spectral_axis,
flux=clean_spectrum_flux,
uncertainty=resampled_spectrum.uncertainty,
velocity_convention='optical',
rest_value=spectrum.rest_value)
clean_template = Spectrum1D(spectral_axis=resampled_template.spectral_axis,
flux=clean_template_flux,
uncertainty=resampled_template.uncertainty,
velocity_convention='optical',
rest_value=template.rest_value)
return clean_spectrum, clean_template