Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# As far as can be told from the unittest module documentation, the
# 'assertRaises' tests do not permit a custom message. So more
# comments than usual here, to help diagnose and test failures.
#
# The 'other_args' argument is needed to pass additional keywords to
# the constructors of some types (T/F series); we cannot simply copy since
# the whole point is to vary the input/output in some way that should cause
# an exception.
if other_args is None:
other_args = {}
tc = test_case
with tc.context:
outty = type(outarr)
outzer = pycbc.types.zeros(len(outarr))
# If we give an output array that is wrong only in length, raise ValueError:
out_badlen = outty(pycbc.types.zeros(len(outarr)+1),dtype=outarr.dtype,**other_args)
args = [inarr, out_badlen]
tc.assertRaises(ValueError,pycbc.fft.ifft,*args)
# If we give an output array that has the wrong precision, raise ValueError:
out_badprec = outty(outzer,dtype=_other_prec[dtype(outarr).type],**other_args)
args = [inarr,out_badprec]
tc.assertRaises(ValueError,pycbc.fft.ifft,*args)
# If we give an output array that has the wrong kind (real or complex) but
# correct precision, then raise a ValueError. Here we must adjust the kind
# of the *input* array, not output. But that makes it hard, because the 'other_args'
# parameter will be wrong for that. Very hacky, but oh well...
new_args = other_args.copy()
if new_args != {}:
try:
delta = new_args.pop('delta_t')
new_args.update({'delta_f' : delta})
except KeyError:
def _test_raise_excep_ifft(test_case,inarr,outarr,other_args=None):
# As far as can be told from the unittest module documentation, the
# 'assertRaises' tests do not permit a custom message. So more
# comments than usual here, to help diagnose and test failures.
#
# The 'other_args' argument is needed to pass additional keywords to
# the constructors of some types (T/F series); we cannot simply copy since
# the whole point is to vary the input/output in some way that should cause
# an exception.
if other_args is None:
other_args = {}
tc = test_case
with tc.context:
outty = type(outarr)
outzer = pycbc.types.zeros(len(outarr))
# If we give an output array that is wrong only in length, raise ValueError:
out_badlen = outty(pycbc.types.zeros(len(outarr)+1),dtype=outarr.dtype,**other_args)
args = [inarr, out_badlen]
tc.assertRaises(ValueError,pycbc.fft.ifft,*args)
# If we give an output array that has the wrong precision, raise ValueError:
out_badprec = outty(outzer,dtype=_other_prec[dtype(outarr).type],**other_args)
args = [inarr,out_badprec]
tc.assertRaises(ValueError,pycbc.fft.ifft,*args)
# If we give an output array that has the wrong kind (real or complex) but
# correct precision, then raise a ValueError. Here we must adjust the kind
# of the *input* array, not output. But that makes it hard, because the 'other_args'
# parameter will be wrong for that. Very hacky, but oh well...
new_args = other_args.copy()
if new_args != {}:
try:
delta = new_args.pop('delta_t')
def _fftw_setup(fftobj):
n = _np.asarray([fftobj.size], dtype=_np.int32)
inembed = _np.asarray([len(fftobj.invec)], dtype=_np.int32)
onembed = _np.asarray([len(fftobj.outvec)], dtype=_np.int32)
nthreads = _scheme.mgr.state.num_threads
if not _fftw_threaded_set:
set_threads_backend()
if nthreads != _fftw_current_nthreads:
_fftw_plan_with_nthreads(nthreads)
mlvl = get_measure_level()
aligned = fftobj.invec.data.isaligned and fftobj.outvec.data.isaligned
flags = get_flag(mlvl, aligned)
plan_func = _plan_funcs_dict[ (str(fftobj.invec.dtype), str(fftobj.outvec.dtype)) ]
tmpin = zeros(len(fftobj.invec), dtype = fftobj.invec.dtype)
tmpout = zeros(len(fftobj.outvec), dtype = fftobj.outvec.dtype)
# C2C, forward
if fftobj.forward and (fftobj.outvec.dtype in [complex64, complex128]):
plan = plan_func(1, n.ctypes.data, fftobj.nbatch,
tmpin.ptr, inembed.ctypes.data, 1, fftobj.idist,
tmpout.ptr, onembed.ctypes.data, 1, fftobj.odist,
FFTW_FORWARD, flags)
# C2C, backward
elif not fftobj.forward and (fftobj.invec.dtype in [complex64, complex128]):
plan = plan_func(1, n.ctypes.data, fftobj.nbatch,
tmpin.ptr, inembed.ctypes.data, 1, fftobj.idist,
tmpout.ptr, onembed.ctypes.data, 1, fftobj.odist,
FFTW_BACKWARD, flags)
# R2C or C2R (hence no direction argument for plan creation)
else:
plan = plan_func(1, n.ctypes.data, fftobj.nbatch,
vectilde = FrequencySeries(zeros(n, dtype=complex_same_precision_as(vec)),
delta_f=1.0,copy=False)
if len(vectilde) < len(vec):
cplen = len(vectilde)
else:
cplen = len(vec)
vectilde[0:cplen] = vec[0:cplen]
delta_f = vec.delta_f
if isinstance(vec,TimeSeries):
vec_pad = TimeSeries(zeros(N),delta_t=vec.delta_t,
dtype=real_same_precision_as(vec))
vec_pad[0:len(vec)] = vec
delta_f = 1.0/(vec.delta_t*N)
vectilde = FrequencySeries(zeros(n),delta_f=1.0,
dtype=complex_same_precision_as(vec))
fft(vec_pad,vectilde)
vectilde = FrequencySeries(vectilde * DYN_RANGE_FAC,delta_f=delta_f,dtype=complex64)
return vectilde
duration = opt.gps_end_time - opt.gps_start_time
pdf = 1. / 128
plen = int(opt.sample_rate / pdf) / 2 + 1
if opt.fake_strain_from_file:
logging.info("Reading ASD from file")
strain_psd = pycbc.psd.from_txt(opt.fake_strain_from_file, plen, pdf,
opt.low_frequency_cutoff, is_asd_file=True)
elif opt.fake_strain != 'zeroNoise':
logging.info("Making PSD for strain")
strain_psd = pycbc.psd.from_string(opt.fake_strain, plen, pdf,
opt.low_frequency_cutoff)
if opt.fake_strain == 'zeroNoise':
logging.info("Making zero-noise time series")
strain = TimeSeries(pycbc.types.zeros(duration * 16384),
delta_t=1. / 16384,
epoch=opt.gps_start_time)
else:
logging.info("Making colored noise")
from pycbc.noise.reproduceable import colored_noise
lowfreq = opt.low_frequency_cutoff / 2.
strain = colored_noise(strain_psd, opt.gps_start_time,
opt.gps_end_time,
seed=opt.fake_strain_seed,
low_frequency_cutoff=lowfreq)
if not opt.channel_name and (opt.injection_file \
or opt.sgburst_injection_file):
raise ValueError('Please provide channel names with the format '
'ifo:channel (e.g. H1:CALIB-STRAIN) to inject '
'simulated signals into fake strain')
corrupt_length = int(corrupt_time * strain.sample_rate)
w = numpy.arange(corrupt_length) / float(corrupt_length)
strain[0:corrupt_length] *= pycbc.types.Array(w, dtype=strain.dtype)
strain[(len(strain) - corrupt_length):] *= \
pycbc.types.Array(w[::-1], dtype=strain.dtype)
if output_intermediates:
strain.save_to_wav('strain_conditioned.wav')
# zero-pad strain to a power-of-2 length
strain_pad_length = next_power_of_2(len(strain))
pad_start = int(strain_pad_length / 2 - len(strain) / 2)
pad_end = pad_start + len(strain)
pad_epoch = strain.start_time - pad_start / float(strain.sample_rate)
strain_pad = pycbc.types.TimeSeries(
pycbc.types.zeros(strain_pad_length, dtype=strain.dtype),
delta_t=strain.delta_t, copy=False, epoch=pad_epoch)
strain_pad[pad_start:pad_end] = strain[:]
# estimate the PSD
psd = pycbc.psd.welch(strain[corrupt_length:(len(strain)-corrupt_length)],
seg_len=int(psd_duration * strain.sample_rate),
seg_stride=int(psd_stride * strain.sample_rate),
avg_method=psd_avg_method,
require_exact_data_fit=False)
psd = pycbc.psd.interpolate(psd, 1. / strain_pad.duration)
psd = pycbc.psd.inverse_spectrum_truncation(
psd, int(psd_duration * strain.sample_rate),
low_frequency_cutoff=low_freq_cutoff,
trunc_method='hann')
kmin = int(low_freq_cutoff / psd.delta_f)
psd[0:kmin] = numpy.inf
94.403/3.024 * eta*eta - 7.75/3.24 * eta*eta*eta)
FTl6 = -8.56/1.05
FTa7 = -(162.85/5.04 - 214.745/1.728 * eta - 193.385/3.024 * eta*eta) \
* lal.PI
dETaN = 2 * -eta/2.0
dETa1 = 2 * -(3.0/4.0 + 1.0/12.0 * eta)
dETa2 = 3 * -(27.0/8.0 - 19.0/8.0 * eta + 1./24.0 * eta*eta)
dETa3 = 4 * -(67.5/6.4 - (344.45/5.76 - 20.5/9.6 * lal.PI*lal.PI) *
eta + 15.5/9.6 * eta*eta + 3.5/518.4 * eta*eta*eta)
amp0 = -4. * mass1 * mass2 / (1.0e+06 * distance * lal.PC_SI ) * \
lal.MRSUN_SI * lal.MTSUN_SI * sqrt(lal.PI/12.0)
htildeP = FrequencySeries(zeros(n,dtype=complex128), delta_f=delta_f, copy=False)
htildeC = FrequencySeries(zeros(n,dtype=complex128), delta_f=delta_f, copy=False)
spintaylorf2_kernel(htildeP.data[kmin:kmax], htildeC.data[kmin:kmax],
kmin, phase_order, amplitude_order, delta_f, piM, pfaN,
pfa2, pfa3, pfa4, pfa5, pfl5,
pfa6, pfl6, pfa7, FTaN, FTa2,
FTa3, FTa4, FTa5, FTa6,
FTl6, FTa7, dETaN, dETa1, dETa2, dETa3,
amp0, tC, phi0,
kappa, prec_fac0, alpha_ref, zeta_ref,
dtdv2, dtdv3, dtdv4, dtdv5,
RE_SBfac0, RE_SBfac1, RE_SBfac2, RE_SBfac3, RE_SBfac4,
IM_SBfac0, IM_SBfac1, IM_SBfac2, IM_SBfac3, IM_SBfac4,
psiJ_P, psiJ_C, gamma0)
return htildeP, htildeC
----------
length : int
The length of noise to generate in samples.
delta_t : float
The time step of the noise.
psd : FrequencySeries
The noise weighting to color the noise.
seed : {0, int}
The seed to generate the noise.
Returns
--------
noise : TimeSeries
A TimeSeries containing gaussian noise colored by the given psd.
"""
noise_ts = TimeSeries(zeros(length), delta_t=delta_t)
if seed is None:
seed = numpy.random.randint(2**32)
randomness = lal.gsl_rng("ranlux", seed)
N = int (1.0 / delta_t / psd.delta_f)
n = N//2+1
stride = N//2
if n > len(psd):
raise ValueError("PSD not compatible with requested delta_t")
psd = (psd[0:n]).lal()
psd.data.data[n-1] = 0
The minimum snr to return when filtering
"""
self.tlen = tlen
self.delta_f = delta_f
self.dtype = dtype
self.snr_threshold = snr_threshold
self.flow = low_frequency_cutoff
self.fhigh = high_frequency_cutoff
self.matched_filter_and_cluster = \
self.full_matched_filter_and_cluster
self.snr_plus_mem = zeros(self.tlen, dtype=self.dtype)
self.corr_plus_mem = zeros(self.tlen, dtype=self.dtype)
self.snr_cross_mem = zeros(self.tlen, dtype=self.dtype)
self.corr_cross_mem = zeros(self.tlen, dtype=self.dtype)
self.snr_mem = zeros(self.tlen, dtype=self.dtype)
self.cached_hplus_hcross_correlation = None
self.cached_hplus_hcross_hplus = None
self.cached_hplus_hcross_hcross = None
self.cached_hplus_hcross_psd = None