Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
raise RuntimeError('Mismatch between length of FITS table ({0}) and length of phase array ({1})!'.format(len(hdulist[1].data),len(phases)))
data_to_add = {'PULSE_PHASE':[phases,'D']}
if args.absphase:
data_to_add['ABS_PHASE'] = [iphss-negmask*u.cycle,'K']
if args.barytime:
tdbs = np.asarray([t.mjd for t in ts.table['tdb']])
data_to_add['BARY_TIME'] = [tdbs,'D']
for key in data_to_add.keys():
if key in hdulist[1].columns.names:
log.info('Found existing %s column, overwriting...'%key)
# Overwrite values in existing Column
hdulist[1].data[key] = data_to_add[key][0]
else:
# Construct and append new column, preserving HDU header and name
log.info('Adding new %s column.'%key)
datacol = pyfits.ColDefs([pyfits.Column(name=key,
format=data_to_add[key][1], array=data_to_add[key][0])])
bt = pyfits.BinTableHDU.from_columns(
hdulist[1].columns + datacol, header=hdulist[1].header,
name=hdulist[1].name)
hdulist[1] = bt
if args.outfile is None:
# Overwrite the existing file
log.info('Overwriting existing FITS file '+args.eventfile)
hdulist.flush(verbose=True, output_verify='warn')
else:
# Write to new output file
log.info('Writing output FITS file '+args.outfile)
hdulist.writeto(args.outfile,overwrite=True, checksum=True, output_verify='warn')
def simple_generic_writer(data, file_name, **kwargs):
"""
Basic `Spectrum1DRef` FITS writer.
"""
# Create fits columns
flux_col = fits.Column(name='FLUX', format='E', array=data.data)
disp_col = fits.Column(name='WAVE', format='E', array=data.dispersion)
uncert_col = fits.Column(name='UNCERT', format='E', array=data.uncertainty.array)
mask_col = fits.Column(name='MASK', format='L', array=data.mask)
cols = fits.ColDefs([flux_col, disp_col, uncert_col, mask_col])
# Create the bin table
tbhdu = fits.BinTableHDU.from_columns(cols)
# Create header
prihdu = fits.PrimaryHDU(header=data.meta.get('header', data.wcs.to_header() if data.wcs is not None else None))
# Compose
thdulist = fits.HDUList([prihdu, tbhdu])
thdulist.writeto("{}.fits".format(file_name), overwrite=True)
hdulist['AIPS AN'].header = head
##################### AIPS FQ TABLE #####################################################################################################
# Convert types & columns
freqid = np.array([1])
bandfreq = np.array([ch1_freq + ch_spacing*i - ref_freq for i in range(nchan)]).reshape([1,nchan])
chwidth = np.array([ch_bw for i in range(nchan)]).reshape([1,nchan])
totbw = np.array([ch_bw for i in range(nchan)]).reshape([1,nchan])
sideband = np.array([1 for i in range(nchan)]).reshape([1,nchan])
freqid = fits.Column(name="FRQSEL", format="1J", array=freqid)
bandfreq = fits.Column(name="IF FREQ", format="%dD"%(nchan), array=bandfreq, unit='HZ')
chwidth = fits.Column(name="CH WIDTH",format="%dE"%(nchan), array=chwidth, unit='HZ')
totbw = fits.Column(name="TOTAL BANDWIDTH",format="%dE"%(nchan),array=totbw, unit='HZ')
sideband = fits.Column(name="SIDEBAND",format="%dJ"%(nchan),array=sideband)
cols = fits.ColDefs([freqid, bandfreq, chwidth, totbw, sideband])
# create table
tbhdu = fits.BinTableHDU.from_columns(cols)
# header information
tbhdu.header.append(("NO_IF", nchan, "Number IFs"))
tbhdu.header.append(("EXTNAME","AIPS FQ"))
tbhdu.header.append(("EXTVER",1))
hdulist.append(tbhdu)
##################### AIPS NX TABLE #####################################################################################################
scan_times = []
scan_time_ints = []
start_vis = []
head['FREQID'] = 1
hdulist['AIPS AN'].header = head
##################### AIPS FQ TABLE #####################################################################################################
# Convert types & columns
freqid = np.array([1])
bandfreq = np.array([ch1_freq + ch_spacing*i - ref_freq for i in range(nchan)]).reshape([1,nchan])
chwidth = np.array([ch_bw for i in range(nchan)]).reshape([1,nchan])
totbw = np.array([ch_bw for i in range(nchan)]).reshape([1,nchan])
sideband = np.array([1 for i in range(nchan)]).reshape([1,nchan])
freqid = fits.Column(name="FRQSEL", format="1J", array=freqid)
bandfreq = fits.Column(name="IF FREQ", format="%dD"%(nchan), array=bandfreq, unit='HZ')
chwidth = fits.Column(name="CH WIDTH",format="%dE"%(nchan), array=chwidth, unit='HZ')
totbw = fits.Column(name="TOTAL BANDWIDTH",format="%dE"%(nchan),array=totbw, unit='HZ')
sideband = fits.Column(name="SIDEBAND",format="%dJ"%(nchan),array=sideband)
cols = fits.ColDefs([freqid, bandfreq, chwidth, totbw, sideband])
# create table
tbhdu = fits.BinTableHDU.from_columns(cols)
# header information
tbhdu.header.append(("NO_IF", nchan, "Number IFs"))
tbhdu.header.append(("EXTNAME","AIPS FQ"))
tbhdu.header.append(("EXTVER",1))
hdulist.append(tbhdu)
##################### AIPS NX TABLE #####################################################################################################
scan_times = []
scan_time_ints = []
if status == 0:
col1 = pyfits.Column(name='TIME',format='D',unit='BJD - 2454833',array=time)
col2 = pyfits.Column(name='TIMECORR',format='E',unit='d',array=timecorr)
col3 = pyfits.Column(name='CADENCENO',format='J',array=cadenceno)
col4 = pyfits.Column(name='SAP_FLUX',format='E',array=sap_flux)
col5 = pyfits.Column(name='SAP_FLUX_ERR',format='E',array=sap_flux_err)
col6 = pyfits.Column(name='SAP_BKG',format='E',array=sap_bkg)
col7 = pyfits.Column(name='SAP_BKG_ERR',format='E',array=sap_bkg_err)
col8 = pyfits.Column(name='PDCSAP_FLUX',format='E',array=sap_flux)
col9 = pyfits.Column(name='PDCSAP_FLUX_ERR',format='E',array=sap_flux_err)
col10 = pyfits.Column(name='SAP_QUALITY',format='J',array=quality)
col11 = pyfits.Column(name='PSF_CENTR1',format='E',unit='pixel',array=psf_centr1)
col12 = pyfits.Column(name='PSF_CENTR1_ERR',format='E',unit='pixel',array=psf_centr1_err)
col13 = pyfits.Column(name='PSF_CENTR2',format='E',unit='pixel',array=psf_centr2)
col14 = pyfits.Column(name='PSF_CENTR2_ERR',format='E',unit='pixel',array=psf_centr2_err)
col15 = pyfits.Column(name='MOM_CENTR1',format='E',unit='pixel',array=mom_centr1)
col16 = pyfits.Column(name='MOM_CENTR1_ERR',format='E',unit='pixel',array=mom_centr1_err)
col17 = pyfits.Column(name='MOM_CENTR2',format='E',unit='pixel',array=mom_centr2)
col18 = pyfits.Column(name='MOM_CENTR2_ERR',format='E',unit='pixel',array=mom_centr2_err)
col19 = pyfits.Column(name='POS_CORR1',format='E',unit='pixel',array=pos_corr1)
col20 = pyfits.Column(name='POS_CORR2',format='E',unit='pixel',array=pos_corr2)
col21 = pyfits.Column(name='RAW_FLUX',format='E',array=raw_flux)
cols = pyfits.ColDefs([col1,col2,col3,col4,col5,col6,col7,col8,col9,
col10,col11,col12,col13,col14,col15,col16,
col17,col18,col19,col20,col21])
hdu1 = pyfits.BinTableHDU.from_columns(cols)
hdu1.header['TTYPE1'] = ('TIME','column title: data time stamps')
hdu1.header['TFORM1'] = ('D','data type: float64')
hdu1.header['TUNIT1'] = ('BJD - 2454833','column units: barycenter corrected JD')
hdu1.header['TDISP1'] = ('D12.7','column display format')
hdu1.header['TTYPE2'] = ('TIMECORR','column title: barycentric-timeslice correction')
hdu1.header['TFORM2'] = ('E','data type: float32')
poltya = np.full((self.Nants_telescope), 'X', dtype=np.object_)
polaa = [90.0] + np.zeros(self.Nants_telescope)
poltyb = np.full((self.Nants_telescope), 'Y', dtype=np.object_)
polab = [0.0] + np.zeros(self.Nants_telescope)
col1 = fits.Column(name='ANNAME', format='8A',
array=self.antenna_names)
col2 = fits.Column(name='STABXYZ', format='3D',
array=self.antenna_positions)
# convert to 1-indexed from 0-indexed indicies
col3 = fits.Column(name='NOSTA', format='1J',
array=self.antenna_numbers + 1)
col4 = fits.Column(name='MNTSTA', format='1J', array=mntsta)
col5 = fits.Column(name='STAXOF', format='1E', array=staxof)
col6 = fits.Column(name='POLTYA', format='1A', array=poltya)
col7 = fits.Column(name='POLAA', format='1E', array=polaa)
# col8 = fits.Column(name='POLCALA', format='3E', array=polcala)
col9 = fits.Column(name='POLTYB', format='1A', array=poltyb)
col10 = fits.Column(name='POLAB', format='1E', array=polab)
# col11 = fits.Column(name='POLCALB', format='3E', array=polcalb)
# note ORBPARM is technically required, but we didn't put it in
cols = fits.ColDefs([col1, col2, col3, col4, col5, col6, col7, col9,
col10])
ant_hdu = fits.BinTableHDU.from_columns(cols)
ant_hdu.header['EXTNAME'] = 'AIPS AN'
ant_hdu.header['EXTVER'] = 1
# write XYZ coordinates if not already defined
ant_hdu.header['ARRAYX'] = self.telescope_location[0]
format = 'E',
array = numpy.insert(mag_err_array, 0, 0.))]
hdu = pyfits.new_table(columns_array)
hdu.name = 'BINS_MAG_ERR'
hdul.append(hdu)
# Store magnitude 1 info
columns_array = [pyfits.Column(name = 'BINS_MAG_1',
format = 'E',
array = bins_mag_1)]
hdu = pyfits.new_table(columns_array)
hdu.name = 'BINS_MAG_1'
hdul.append(hdu)
# Store magnitude 2 info
columns_array = [pyfits.Column(name = 'BINS_MAG_2',
format = 'E',
array = bins_mag_2)]
hdu = pyfits.new_table(columns_array)
hdu.name = 'BINS_MAG_2'
hdul.append(hdu)
logger.info('Writing look-up table to %s'%(outfile))
hdul.writeto(outfile, clobber = True)
def _make_target_extension(self, ext_info):
"""Create the 'TARGETTABLES' extension (i.e. extension #1)."""
# Turn the data arrays into fits columns and initialize the HDU
coldim = '({},{})'.format(self.n_cols, self.n_rows)
eformat = '{}E'.format(self.n_rows * self.n_cols)
jformat = '{}J'.format(self.n_rows * self.n_cols)
cols = []
cols.append(fits.Column(name='TIME', format='D', unit='BJD - 2454833',
array=self.time))
cols.append(fits.Column(name='TIMECORR', format='E', unit='D',
array=self.timecorr))
cols.append(fits.Column(name='CADENCENO', format='J',
array=self.cadenceno))
cols.append(fits.Column(name='RAW_CNTS', format=jformat,
unit='count', dim=coldim,
array=self.raw_cnts))
cols.append(fits.Column(name='FLUX', format=eformat,
unit='e-/s', dim=coldim,
array=self.flux))
cols.append(fits.Column(name='FLUX_ERR', format=eformat,
unit='e-/s', dim=coldim,
array=self.flux_err))
cols.append(fits.Column(name='FLUX_BKG', format=eformat,
unit='e-/s', dim=coldim,
array=self.flux_bkg))
cols.append(fits.Column(name='FLUX_BKG_ERR', format=eformat,
unit='e-/s', dim=coldim,
array=self.flux_bkg_err))
col7 = pyfits.Column(name='SAP_BKG_ERR', format='E', unit='e-/s',
array=sap_bkg_err)
col8 = pyfits.Column(name='PDCSAP_FLUX', format='E', unit='e-/s',
array=pdc_flux)
col9 = pyfits.Column(name='PDCSAP_FLUX_ERR', format='E', unit='e-/s',
array=pdc_flux_err)
col10 = pyfits.Column(name='SAP_QUALITY', format='J', array=quality)
col11 = pyfits.Column(name='PSF_CENTR1', format='E', unit='pixel',
array=psf_centr1)
col12 = pyfits.Column(name='PSF_CENTR1_ERR', format='E', unit='pixel',
array=psf_centr1_err)
col13 = pyfits.Column(name='PSF_CENTR2', format='E', unit='pixel',
array=psf_centr2)
col14 = pyfits.Column(name='PSF_CENTR2_ERR', format='E', unit='pixel',
array=psf_centr2_err)
col15 = pyfits.Column(name='MOM_CENTR1', format='E', unit='pixel',
array=mom_centr1)
col16 = pyfits.Column(name='MOM_CENTR1_ERR', format='E', unit='pixel',
array=mom_centr1_err)
col17 = pyfits.Column(name='MOM_CENTR2', format='E', unit='pixel',
array=mom_centr2)
col18 = pyfits.Column(name='MOM_CENTR2_ERR', format='E', unit='pixel',
array=mom_centr2_err)
col19 = pyfits.Column(name='POS_CORR1', format='E', unit='pixel',
array=pos_corr1)
col20 = pyfits.Column(name='POS_CORR2', format='E', unit='pixel',
array=pos_corr2)
col21 = pyfits.Column(name='PCA_FLUX', format='E', unit='e-/s',
array=fluxcor)
col22 = pyfits.Column(name='PCA_FLUX_NRM', format='E', array=normfluxcor)
cols = pyfits.ColDefs([col1, col2, col3, col4, col5, col6, col7, col8,
col9, col10, col11, col12, col13, col14, col15,
hdulist = pyfits.open(args.eventfile,mode='update')
else:
hdulist = pyfits.open(args.eventfile)
event_hdu = hdulist[1]
event_hdr=event_hdu.header
event_dat=event_hdu.data
if len(event_dat) != len(phases):
raise RuntimeError('Mismatch between length of FITS table ({0}) and length of phase array ({1})!'.format(len(event_dat),len(phases)))
if 'PULSE_PHASE' in event_hdu.columns.names:
log.info('Found existing PULSE_PHASE column, overwriting...')
# Overwrite values in existing Column
event_dat['PULSE_PHASE'] = phases
else:
# Construct and append new column, preserving HDU header and name
log.info('Adding new PULSE_PHASE column.')
phasecol = pyfits.ColDefs([pyfits.Column(name='PULSE_PHASE', format='D',
array=phases)])
bt = pyfits.BinTableHDU.from_columns( event_hdu.columns + phasecol,
header=event_hdr,name=event_hdu.name)
hdulist[1] = bt
if args.outfile is None:
# Overwrite the existing file
log.info('Overwriting existing FITS file '+args.eventfile)
hdulist.flush(verbose=True, output_verify='warn')
else:
# Write to new output file
log.info('Writing output FITS file '+args.outfile)
hdulist.writeto(args.outfile,overwrite=True, checksum=True, output_verify='warn')
return 0