Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Try to see how many freq derivs we have
fs = [x.F0]
for ii in range(1, 20): # hopefully 20 is an upper limit!
attrib = "F%d"%ii
if hasattr(x, attrib):
fs.append(getattr(x, attrib))
else:
break
epoch = x.PEPOCH
T = (x.FINISH - x.START) * 86400.0
else:
print("I don't recognize the file type for", filenm)
sys.exit()
newts = epoch + num.arange(int(T/10.0+0.5), dtype=num.float)/8640.0
time = num.concatenate((time, newts))
newps = 1.0 / pu.calc_freq(newts, epoch, *fs)
period = num.concatenate((period, newps))
print("%13.7f (%0.1f sec): " % (epoch, T), fs)
def inject(infile, outfn, prof, period, dm, nbitsout=None,
block_size=BLOCKSIZE, pulsar_only=False, inplace=False):
if isinstance(infile, filterbank.FilterbankFile):
fil = infile
elif inplace:
fil = filterbank.FilterbankFile(infile, 'readwrite')
else:
fil = filterbank.FilterbankFile(infile, 'read')
print("Injecting pulsar signal into: %s" % fil.filename)
if False:
delays = psr_utils.delay_from_DM(dm, fil.frequencies)
delays -= delays[np.argmax(fil.frequencies)]
get_phases = lambda times: (times-delays)/period % 1
else:
get_phases = lambda times: times/period % 1
# Create the output filterbank file
if nbitsout is None:
nbitsout = fil.nbits
if inplace:
warnings.warn("Injecting pulsar signal *in-place*")
outfil = fil
else:
# Start an output file
print("Creating out file: %s" % outfn)
outfil = filterbank.create_filterbank_file(outfn, fil.header, \
nbits=nbitsout, mode='append')
def get_phasedelays(dm, freqs, period):
"""Return phase delays corresponding to a particular DM.
Inputs:
dm: DM (in pc cm-3)
freqs: The list of frequencies (in MHz)
period: The profiles period (in seconds)
Outputs:
phasedelays: The corresponding phase delays.
"""
# Prepare delays
timedelays = psr_utils.delay_from_DM(dm, freqs)
# Reference all delays to highest frequency channel, which remains
# unchanged
# TODO: Do we really want to refer to high freq?
timedelays -= timedelays[np.argmax(freqs)]
phasedelays = timedelays/period
return phasedelays
# Make period labels be non-scientific notation
ax.get_xaxis().set_major_formatter(plt.FormatStrFormatter("%g"))
# Plot magnetic field lines
Bs_to_plot = [8, 10, 12, 14]
for logB in Bs_to_plot:
plt.plot(plims, pu.pdot_from_B(plims, 10.0**logB), '-', color=grey)
if logB==14:
y = 4e-10
x = pu.pdot_from_B(y, 10.0**logB)
elif logB==8:
x = 0.05
y = 1.1 * pu.pdot_from_B(x, 10.0**logB)
else:
x = 1.1 * plims[0]
y = 1.1 * pu.pdot_from_B(x, 10.0**logB)
plt.text(x, y, "$10^{%d}$ G"%logB, color=greytext,
rotation=np.degrees(np.arctan(-1.0 * dpdpd)))
# Plot Edot lines
Edots_to_plot = [31, 34, 37, 40]
for logEdot in Edots_to_plot[::-1]:
plt.plot(plims, pu.pdot_from_edot(plims, 10.0**logEdot), '-', color=grey)
if logEdot > 31:
y = 5e-10
x = 0.6 * (y * 4e45 * np.pi * np.pi / 10.0**logEdot)**(1.0/3.0)
else:
y = 1e-21
x = 0.6 * (y * 4e45 * np.pi * np.pi / 10.0**logEdot)**(1.0/3.0)
plt.text(x, y, "$10^{%d}$ erg/s"%logEdot, color=greytext,
rotation=np.degrees(np.arctan(3.0 * dpdpd)))
def subband_smear(DM, subDM, subBW, fctr):
"""
subband_smear(DM, subDM, subBW, fctr):
Return the smearing in ms caused by subbanding at DM='DM' given
subbands of bandwidth 'subBW' (MHz) at DM='subDM'. All values
are computed at the frequency fctr in MHz.
"""
return 1000.0 * pu.dm_smear(num.fabs(DM-subDM), subBW, fctr)
nbinlim = np.int(duration/data.dt)
img = ax_im.imshow(data.data[..., :nbinlim], aspect='auto', \
cmap=matplotlib.cm.cmap_d[cmap_str], \
interpolation='nearest', origin='upper', \
extent=(data.starttime, data.starttime+ nbinlim*data.dt, \
data.freqs.min(), data.freqs.max()))
if show_cb:
cb = ax_im.get_figure().colorbar(img)
cb.set_label("Scaled signal intensity (arbitrary units)")
#plt.axis('tight')
# Sweeping it up
for ii, sweep_dm in enumerate(sweep_dms):
ddm = sweep_dm-data.dm
delays = psr_utils.delay_from_DM(ddm, data.freqs)
delays -= delays.min()
if sweep_posns is None:
sweep_posn = 0.0
elif len(sweep_posns) == 1:
sweep_posn = sweep_posns[0]
else:
sweep_posn = sweep_posns[ii]
sweepstart = data.dt*data.numspectra*sweep_posn+data.starttime
sty = SWEEP_STYLES[ii%len(SWEEP_STYLES)]
ax_im.plot(delays+sweepstart, data.freqs, sty, lw=4, alpha=0.5)
# Dressing it up
ax_im.xaxis.get_major_formatter().set_useOffset(False)
ax_im.set_xlabel("Time")
ax_im.set_ylabel("Observing frequency (MHz)")
# A list of profiles, one for each channel
profiles = []
if dm <= 0:
warnings.warn("DM will not be applied because it is 0 (or smaller?!)")
do_delay = False
do_smear = False
do_scatter = False
if do_delay:
phasedelays = get_phasedelays(dm, freqs, period)
else:
phasedelays = np.zeros(nfreqs)
# Prepare for smear campaign
smeartimes = psr_utils.dm_smear(dm, abs(chan_width), freqs) # In seconds
smearphases = smeartimes/period
# Prepare to scatter
scattertimes = psr_utils.pulse_broadening(dm, freqs)*1e-3 # In seconds
scatterphases = scattertimes/period
if DEBUG:
for ichan, (freq, smear, scatt, delay) in \
enumerate(zip(freqs, smearphases, scatterphases, phasedelays)):
print(" Chan #%d - Freq: %.3f MHz -- " \
"Smearing, scattering, delay (all in phase): " \
"%g, %g, %g" % (ichan, freq, smear, scatt, delay))
oldprogress = 0
sys.stdout.write(" %3.0f %%\r" % oldprogress)
sys.stdout.flush()
# ylim = None
# Note: the offset has _not_ been measured for the 2048-lag mode
if self.tepoch > 0.0: self.tepoch += 0.039450/86400.0
if self.bestprof: self.bestprof.epochf += 0.039450/86400.0
(self.topo_pow, tmp) = struct.unpack(swapchar+"f"*2, infile.read(2*4))
(self.topo_p1, self.topo_p2, self.topo_p3) = struct.unpack(swapchar+"d"*3, \
infile.read(3*8))
(self.bary_pow, tmp) = struct.unpack(swapchar+"f"*2, infile.read(2*4))
(self.bary_p1, self.bary_p2, self.bary_p3) = struct.unpack(swapchar+"d"*3, \
infile.read(3*8))
(self.fold_pow, tmp) = struct.unpack(swapchar+"f"*2, infile.read(2*4))
(self.fold_p1, self.fold_p2, self.fold_p3) = struct.unpack(swapchar+"d"*3, \
infile.read(3*8))
# Save current p, pd, pdd
# NOTE: Fold values are actually frequencies!
self.curr_p1, self.curr_p2, self.curr_p3 = \
psr_utils.p_to_f(self.fold_p1, self.fold_p2, self.fold_p3)
self.pdelays_bins = Num.zeros(self.npart, dtype='d')
(self.orb_p, self.orb_e, self.orb_x, self.orb_w, self.orb_t, self.orb_pd, \
self.orb_wd) = struct.unpack(swapchar+"d"*7, infile.read(7*8))
self.dms = Num.asarray(struct.unpack(swapchar+"d"*self.numdms, \
infile.read(self.numdms*8)))
if self.numdms==1:
self.dms = self.dms[0]
self.periods = Num.asarray(struct.unpack(swapchar+"d"*self.numperiods, \
infile.read(self.numperiods*8)))
self.pdots = Num.asarray(struct.unpack(swapchar+"d"*self.numpdots, \
infile.read(self.numpdots*8)))
self.numprofs = self.nsub*self.npart
if (swapchar=='<'): # little endian
self.profs = Num.zeros((self.npart, self.nsub, self.proflen), dtype='d')
for ii in range(self.npart):
for jj in range(self.nsub):
The values 'median' and 'mean' refer to the
median and mean of the channel. The value
'rotate' takes values from one end of the
channel and shifts them to the other.
Outputs:
None
*** Shifting happens in-place ***
"""
assert self.numchans == len(bins)
for ii in range(self.numchans):
chan = self.get_chan(ii)
# Use 'chan[:]' so update happens in-place
# this way the change effects self.data
chan[:] = psr_utils.rotate(chan, bins[ii])
if padval!='rotate':
# Get padding value
if padval=='mean':
pad = np.mean(chan)
elif padval=='median':
pad = np.median(chan)
else:
pad = padval
# Replace rotated values with padval
if bins[ii]>0:
chan[-bins[ii]:] = pad
elif bins[ii]<0:
chan[:-bins[ii]] = pad