Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Outputs:
None
*** Subbanding happens in-place ***
"""
assert (self.numchans % nsub) == 0
assert (subdm is None) or (subdm >= 0)
nchan_per_sub = self.numchans // nsub
sub_hifreqs = self.freqs[np.arange(nsub)*nchan_per_sub]
sub_lofreqs = self.freqs[(1+np.arange(nsub))*nchan_per_sub-1]
sub_ctrfreqs = 0.5*(sub_hifreqs+sub_lofreqs)
if subdm is not None:
# Compute delays
ref_delays = psr_utils.delay_from_DM(subdm-self.dm, sub_ctrfreqs)
delays = psr_utils.delay_from_DM(subdm-self.dm, self.freqs)
rel_delays = delays-ref_delays.repeat(nchan_per_sub) # Relative delay
rel_bindelays = np.round(rel_delays/self.dt).astype('int')
# Shift channels
self.shift_channels(rel_bindelays, padval)
# Subband
self.data = np.array([np.sum(sub, axis=0) for sub in \
np.vsplit(self.data, nsub)])
self.freqs = sub_ctrfreqs
self.numchans = nsub
Outputs:
None
*** Subbanding happens in-place ***
"""
assert (self.numchans % nsub) == 0
assert (subdm is None) or (subdm >= 0)
nchan_per_sub = self.numchans // nsub
sub_hifreqs = self.freqs[np.arange(nsub)*nchan_per_sub]
sub_lofreqs = self.freqs[(1+np.arange(nsub))*nchan_per_sub-1]
sub_ctrfreqs = 0.5*(sub_hifreqs+sub_lofreqs)
if subdm is not None:
# Compute delays
ref_delays = psr_utils.delay_from_DM(subdm-self.dm, sub_ctrfreqs)
delays = psr_utils.delay_from_DM(subdm-self.dm, self.freqs)
rel_delays = delays-ref_delays.repeat(nchan_per_sub) # Relative delay
rel_bindelays = np.round(rel_delays/self.dt).astype('int')
# Shift channels
self.shift_channels(rel_bindelays, padval)
# Subband
self.data = np.array([np.sum(sub, axis=0) for sub in \
np.vsplit(self.data, nsub)])
self.freqs = sub_ctrfreqs
self.numchans = nsub
scaleindep = None, zerodm = None, mask = mask,
barytime=barytime,
nsub = nsub, bandpass_corr = bandpass_corr)
data, Data_nozerodm = waterfall_array(rawdatafile, spdcand.start,
spdcand.duration, spdcand.dm, spdcand.nbins, spdcand.nsub,
spdcand.subdm, spdcand.zerodm, spdcand.downsamp,
spdcand.scaleindep, spdcand.width_bins,
spdcand.mask, maskfile, spdcand.bandpass_corr)
text_array = np.append(text_array, spdcand.sweep_duration)
text_array = np.append(text_array, data.starttime)
text_array = np.append(text_array, spdcand.bary_start_time)
text_array = np.append(text_array, man_params)
# Array to Construct the sweep
if spdcand.sweep_dm is not None:
ddm = spdcand.sweep_dm-data.dm
delays = psr_utils.delay_from_DM(ddm, data.freqs)
delays -= delays.min()
delays_nozerodm = delays
freqs_nozerodm = data.freqs
# Sweeped with zerodm-on
zerodm = True
#downsamp_temp = 1
data, Data_zerodm = waterfall_array(rawdatafile, spdcand.start,
spdcand.duration, spdcand.dm, spdcand.nbins, spdcand.nsub,
spdcand.subdm, zerodm, spdcand.downsamp,
spdcand.scaleindep, spdcand.width_bins,
spdcand.mask, maskfile, spdcand.bandpass_corr)
# Saving the arrays into the .spd file.
with open(temp_filename+".spd", 'wb') as f:
np.savez_compressed(f,
Data_dedisp_nozerodm = Data_dedisp_nozerodm.astype(np.float16),
Data_dedisp_zerodm = Data_dedisp_zerodm.astype(np.float16),
rawdatafile.frequencies[0], rawdatafile.frequencies[-1], rawdatafile,
loc_pulse=loc_pulse, dedisp=None, scaleindep=None, zerodm=None, mask=mask,
barytime=barytime, bandpass_corr=bandpass_corr)
data, Data_nozerodm = waterfall_array(rawdatafile, spdcand.start,
spdcand.duration, spdcand.dm, spdcand.nbins, spdcand.nsub,
spdcand.subdm, spdcand.zerodm, spdcand.downsamp,
spdcand.scaleindep, spdcand.width_bins,
spdcand.mask, maskfile, spdcand.bandpass_corr)
text_array = np.append(text_array, spdcand.sweep_duration)
text_array = np.append(text_array, data.starttime)
text_array = np.append(text_array, spdcand.bary_start_time)
text_array = np.append(text_array, man_params)
# Array to Construct the sweep
if spdcand.sweep_dm is not None:
ddm = spdcand.sweep_dm-data.dm
delays = psr_utils.delay_from_DM(ddm, data.freqs)
delays -= delays.min()
delays_nozerodm = delays
freqs_nozerodm = data.freqs
# Sweeped with zerodm-on
zerodm = True
#downsamp_temp = 1
data, Data_zerodm = waterfall_array(rawdatafile, spdcand.start,
spdcand.duration, spdcand.dm, spdcand.nbins, spdcand.nsub,
spdcand.subdm, zerodm, spdcand.downsamp,
spdcand.scaleindep, spdcand.width_bins,
spdcand.mask, maskfile, spdcand.bandpass_corr)
with open(temp_filename+".spd", 'wb') as f:
np.savez_compressed(f,
Data_dedisp_nozerodm = Data_dedisp_nozerodm.astype(np.float16),
Data_dedisp_zerodm = Data_dedisp_zerodm.astype(np.float16),
Data_nozerodm = Data_nozerodm.astype(np.float16),
the given DM.
Inputs:
dm: The DM (in pc/cm^3) to use.
padval: The padding value to use when shifting
channels during dedispersion. See documentation
of Spectra.shift_channels. (Default: 0)
Outputs:
None
*** Dedispersion happens in place ***
"""
assert dm >= 0
ref_delay = psr_utils.delay_from_DM(dm-self.dm, np.max(self.freqs))
delays = psr_utils.delay_from_DM(dm-self.dm, self.freqs)
rel_delays = delays-ref_delay # Relative delay
rel_bindelays = np.round(rel_delays/self.dt).astype('int')
# Shift channels
self.shift_channels(rel_bindelays, padval)
self.dm=dm