Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Returns:
"""
# Load hit details
dat = find_event.make_table(dat_filename)
hit = dat.iloc[hit_id]
f0 = hit['Freq']
if bw is None:
bw_mhz = np.abs(hit['FreqStart'] - hit['FreqEnd'])
else:
bw_mhz = bw * 1e-6
fil = Waterfall(fil_filename, f_start=f0 - bw_mhz / 2, f_stop=f0 + bw_mhz / 2)
t_duration = (fil.n_ints_in_file - 1) * fil.header['tsamp']
fil.plot_waterfall()
plot_event.overlay_drift(f0, f0, f0, hit['DriftRate'], t_duration, offset)
def __split_h5(self, size_limit=SIZE_LIM):
"""
Creates a plan to select data from single coarse channels.
:param size_limit: float, maximum size in MB that the file is allowed to be
:return: list[DATAH5], the list contains a DATAH5 object for each of the coarse channels in
the file
"""
data_list = []
#Instancing file.
try:
fil_file = Waterfall(self.filename)
except:
logger.error("Error encountered when trying to open file: %s"%self.filename)
raise IOError("Error encountered when trying to open file: %s"%self.filename)
#Finding lowest freq in file.
f_delt = fil_file.header['foff']
f0 = fil_file.header['fch1']
#Looping over the number of coarse channels.
if self.n_coarse_chan is not None:
n_coarse_chan = self.n_coarse_chan
elif fil_file.header.get('n_coarse_chan', None) is not None:
n_coarse_chan = fil_file.header['n_coarse_chan']
else:
n_coarse_chan = int(fil_file.calc_n_coarse_chan())
def __make_h5_file(self,):
"""
Converts file to h5 format. Saves output in current dir. Sets the filename attribute of the calling DATAHandle
to the (new) filename.
:return: void
"""
fil_file = Waterfall(self.filename)
bn = os.path.basename(self.filename)
new_filename = os.path.join(self.out_dir, bn.replace('.fil', '.h5'))
fil_file.write_to_hdf5(new_filename)
self.filename = new_filename
source_name_list,
offset=0,
plot_snr_list=False,
**kwargs):
#load in the data for each individual hit
for i in range(0, len(candidate_event_dataframe)):
candidate = candidate_event_dataframe.iloc[i]
on_source_name = candidate['Source']
f_mid = candidate['Freq']
drift_rate = candidate['DriftRate']
#calculate the length of the total cadence from the fil files' headers
first_fil = bl.Waterfall(fil_file_list[0], load_data=False)
tfirst = first_fil.header['tstart']
last_fil = bl.Waterfall(fil_file_list[-1], load_data=False)
tlast = last_fil.header['tstart']
t_elapsed = Time(tlast, format='mjd').unix - Time(tfirst, format='mjd').unix + (last_fil.n_ints_in_file -1) * last_fil.header['tsamp']
#calculate the width of the plot based on making sure the full drift is visible
bandwidth = 2.4 * abs(drift_rate)/1e6 * t_elapsed
bandwidth = np.max((bandwidth, 500./1e6))
#Get start and stop frequencies based on midpoint and bandwidth
f_start, f_stop = np.sort((f_mid - (bandwidth/2), f_mid + (bandwidth/2)))
#Print useful values
print('')
print('*************************************************')
print('*** The Parameters for This Plot Are: ****')
print('Target = ', on_source_name)
print('Bandwidth = ', round(bandwidth, 5), ' MHz')
:param t_stop: int stop integration ID
:param coarse_chan: int
:param n_coarse_chan: int
"""
self.filename = filename
self.closed = False
self.f_start = f_start
self.f_stop = f_stop
self.t_start = t_start
self.t_stop = t_stop
self.n_coarse_chan = n_coarse_chan
#Instancing file.
try:
self.fil_file = Waterfall(filename,f_start=self.f_start, f_stop=self.f_stop,t_start=self.t_start, t_stop=self.t_stop,load_data=False)
except:
logger.error("Error encountered when trying to open file %s"%filename)
raise IOError("Error encountered when trying to open file %s"%filename)
#Getting header
try:
if self.n_coarse_chan:
header = self.__make_data_header(self.fil_file.header,coarse=True)
else:
header = self.__make_data_header(self.fil_file.header)
except:
logger.debug('The fil_file.header is ' % self.fil_file.header)
raise IOError("Error accessing header from file: %s." % self.filename)
self.header = header
f_mid = middle frequency of the event, in MHz
filter_level = filter level (1,2,or 3) that produced the event
source_name_list = list of source names in the cadence, in order
bandwidth = width of the plot, incorporating drift info
kwargs: keyword args to be passed to matplotlib imshow()
'''
#prepare for plotting
matplotlib.rc('font', **font)
#set up the sub-plots
n_plots = len(fil_file_list)
fig = plt.subplots(n_plots, sharex=True, sharey=True,figsize=(10, 2*n_plots))
#read in data for the first panel
fil1 = bl.Waterfall(fil_file_list[0], f_start=f_start, f_stop=f_stop)
t0 = fil1.header['tstart']
dummy, plot_data1 = fil1.grab_data()
#rebin data to plot correctly with fewer points
dec_fac_x, dec_fac_y = 1, 1
if plot_data1.shape[0] > MAX_IMSHOW_POINTS[0]:
dec_fac_x = plot_data1.shape[0] / MAX_IMSHOW_POINTS[0]
if plot_data1.shape[1] > MAX_IMSHOW_POINTS[1]:
dec_fac_y = int(np.ceil(plot_data1.shape[1] / MAX_IMSHOW_POINTS[1]))
plot_data1 = rebin(plot_data1, dec_fac_x, dec_fac_y)
#define more plot parameters
### never used: delta_f = 0.000250
mid_f = np.abs(f_start+f_stop)/2.
subplots = []
fil_file_list,
filter_level,
source_name_list,
offset=0,
plot_snr_list=False,
**kwargs):
#load in the data for each individual hit
for i in range(0, len(candidate_event_dataframe)):
candidate = candidate_event_dataframe.iloc[i]
on_source_name = candidate['Source']
f_mid = candidate['Freq']
drift_rate = candidate['DriftRate']
#calculate the length of the total cadence from the fil files' headers
first_fil = bl.Waterfall(fil_file_list[0], load_data=False)
tfirst = first_fil.header['tstart']
last_fil = bl.Waterfall(fil_file_list[-1], load_data=False)
tlast = last_fil.header['tstart']
t_elapsed = Time(tlast, format='mjd').unix - Time(tfirst, format='mjd').unix + (last_fil.n_ints_in_file -1) * last_fil.header['tsamp']
#calculate the width of the plot based on making sure the full drift is visible
bandwidth = 2.4 * abs(drift_rate)/1e6 * t_elapsed
bandwidth = np.max((bandwidth, 500./1e6))
#Get start and stop frequencies based on midpoint and bandwidth
f_start, f_stop = np.sort((f_mid - (bandwidth/2), f_mid + (bandwidth/2)))
#Print useful values
print('')
print('*************************************************')
print('*** The Parameters for This Plot Are: ****')
def get_info(self):
""":return: dict, header of the blimpy file"""
fil_file = Waterfall(self.filename,load_data=False)
return fil_file.header