Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def inject(infile, outfn, prof, period, dm, nbitsout=None,
block_size=BLOCKSIZE, pulsar_only=False, inplace=False):
if isinstance(infile, filterbank.FilterbankFile):
fil = infile
elif inplace:
fil = filterbank.FilterbankFile(infile, 'readwrite')
else:
fil = filterbank.FilterbankFile(infile, 'read')
print("Injecting pulsar signal into: %s" % fil.filename)
if False:
delays = psr_utils.delay_from_DM(dm, fil.frequencies)
delays -= delays[np.argmax(fil.frequencies)]
get_phases = lambda times: (times-delays)/period % 1
else:
get_phases = lambda times: times/period % 1
# Create the output filterbank file
if nbitsout is None:
nbitsout = fil.nbits
if inplace:
warnings.warn("Injecting pulsar signal *in-place*")
outfil = fil
def main():
fn = args.infile
if args.inplace:
fil = filterbank.FilterbankFile(fn, mode='readwrite')
else:
fil = filterbank.FilterbankFile(fn, mode='read')
if args.inprof is not None:
warnings.warn("Saved profiles already may be tuned to a particular " \
"DM, period and filterbank file (freq, nchans, " \
"tsamp, etc).")
prof = load_profile(args.inprof)
else:
prof = make_profile(args.vonmises)
prof = apply_dm(prof, args.period, args.dm, \
fil.foff, fil.frequencies, fil.tsamp)
scale_profile(prof, args.scale_name, args.scale_cfgstrs, fil)
if args.outprof is not None:
save_profile(prof, args.outprof)
outfn = args.outname % fil.header
"Check lo/hi frequencies." % new_nchans)
print("Will extract")
print(" %d bins (%d to %d incl.)" % (new_nspec, startbin, endbin-1))
print(" (Original num bins: %d)" % fil.nspec)
print(" %d channels (%d to %d incl.)" % (new_nchans, lochan, hichan-1))
print(" (Original num chans: %d)" % fil.nchans)
# Create output file
outfn = options.outname % fil.header
print("Creating out file: %s" % outfn)
outhdr = copy.deepcopy(fil.header)
outhdr['nchans'] = new_nchans
outhdr['fch1'] = fil.frequencies[lochan]
filterbank.create_filterbank_file(outfn, outhdr, nbits=fil.nbits)
outfil = filterbank.FilterbankFile(outfn, mode='write')
# Write data
sys.stdout.write(" %3.0f %%\r" % 0)
sys.stdout.flush()
nblocks = int(new_nspec/options.block_size)
remainder = new_nspec % options.block_size
oldprogress = -1
for iblock in np.arange(nblocks):
lobin = iblock*options.block_size + startbin
hibin = lobin+options.block_size
spectra = fil.get_spectra(lobin, hibin)
spectra = spectra[:,lochan:hichan] # restrict channels
outfil.append_spectra(spectra)
progress = int(100.0*((hibin-startbin)/new_nspec))
if progress > oldprogress:
sys.stdout.write(" %3.0f %%\r" % progress)
def inject(infile, outfn, prof, period, dm, nbitsout=None,
block_size=BLOCKSIZE, pulsar_only=False, inplace=False):
if isinstance(infile, filterbank.FilterbankFile):
fil = infile
elif inplace:
fil = filterbank.FilterbankFile(infile, 'readwrite')
else:
fil = filterbank.FilterbankFile(infile, 'read')
print("Injecting pulsar signal into: %s" % fil.filename)
if False:
delays = psr_utils.delay_from_DM(dm, fil.frequencies)
delays -= delays[np.argmax(fil.frequencies)]
get_phases = lambda times: (times-delays)/period % 1
else:
get_phases = lambda times: times/period % 1
# Create the output filterbank file
if nbitsout is None:
nbitsout = fil.nbits
if inplace:
warnings.warn("Injecting pulsar signal *in-place*")
outfil = fil
else:
# Start an output file
def main():
infn = args[0]
print("Reading filterbank file (%s)" % infn)
fil = filterbank.FilterbankFile(infn)
if options.start_time is None:
startbin = 0
else:
startbin = int(np.round(options.start_time/fil.tsamp))
if options.end_time is None:
endbin = fil.nspec
else:
endbin = int(np.round(options.end_time/fil.tsamp))+1
new_nspec = endbin-startbin
if new_nspec <= 0:
raise ValueError("Bad number of spectra to be written (%d). " \
"Check start/end times." % new_nspec)
# Determine lo/hi channels to write to file
# If high frequencies come first in spectra 'hichan' refers to
def main():
fn = args.infile
if args.inplace:
fil = filterbank.FilterbankFile(fn, mode='readwrite')
else:
fil = filterbank.FilterbankFile(fn, mode='read')
if args.inprof is not None:
warnings.warn("Saved profiles already may be tuned to a particular " \
"DM, period and filterbank file (freq, nchans, " \
"tsamp, etc).")
prof = load_profile(args.inprof)
else:
prof = make_profile(args.vonmises)
prof = apply_dm(prof, args.period, args.dm, \
fil.foff, fil.frequencies, fil.tsamp)
scale_profile(prof, args.scale_name, args.scale_cfgstrs, fil)
if args.outprof is not None:
save_profile(prof, args.outprof)
outfn = args.outname % fil.header
print("Showing plot of profile to be injected...")
plt.figure()
def main():
fn = args[0]
if fn.endswith(".fil"):
# Filterbank file
filetype = "filterbank"
rawdatafile = filterbank.FilterbankFile(fn)
elif fn.endswith(".fits"):
# PSRFITS file
filetype = "psrfits"
rawdatafile = psrfits.PsrfitsFile(fn)
else:
raise ValueError("Cannot recognize data file type from "
"extension. (Only '.fits' and '.fil' "
"are supported.)")
data, bins, nbins, start = waterfall(rawdatafile, options.start, \
options.duration, dm=options.dm,\
nbins=options.nbins, nsub=options.nsub,\
subdm=options.subdm, zerodm=options.zerodm, \
downsamp=options.downsamp, \
scaleindep=options.scaleindep, \
width_bins=options.width_bins, mask=options.mask, \