Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_rev_complex_arr(self):
for rev_dtype in [complex64,complex128]:
# Don't do separate even/odd tests for complex
inarr = ar(self.in_c2c_rev,dtype=rev_dtype)
outexp = ar(self.out_c2c_rev,dtype=rev_dtype)
_test_ifft(self,inarr,outexp,self.tdict[rev_dtype])
# Random---we don't do that in 'reverse' tests, since both
# directions are already tested in forward, and if we just passed
# in arrays in the other order we'd only get exceptions
#
# Check that exceptions are raised. Need input and
# output arrays; just reuse inarr and outexp (values won't
# matter, we're just checking exceptions).
_test_raise_excep_ifft(self,inarr,outexp)
def ourcopy(other):
"""
A convenience function to return an exact copy of a pycbc.type instance.
"""
if not isinstance(other,pycbc.types.Array):
raise TypeError("ourcopy() can only be used to duplicate PyCBC types")
if isinstance(other,pycbc.types.TimeSeries):
return pycbc.types.TimeSeries(initial_array=other._data,dtype=other.dtype,
delta_t=other._delta_t,epoch=other._epoch,copy=True)
if isinstance(other,pycbc.types.FrequencySeries):
return pycbc.types.FrequencySeries(initial_array=other._data,dtype=other.dtype,
delta_f=other._delta_f,epoch=other._epoch,copy=True)
if isinstance(other,pycbc.types.Array):
return pycbc.types.Array(initial_array=other._data,dtype=other.dtype,copy=True)
self.alist = [5+1j,3+3j,1+5j]
if self.okind == 'real':
self.b = self.type([10,8,6],dtype=self.odtype,**self.kwds)
self.blist = [10,8,6]
else:
self.b = self.type([10+6j,8+4j,6+2j],dtype=self.odtype,**self.kwds)
self.blist = [10+6j,8+4j,6+2j]
# And the scalar to test on
if self.okind == 'real':
self.scalar = 5
else:
self.scalar = 5+2j
# The weights used in the weighted inner product test are always an Array,
# regardless of the types whose inner product is being tested.
self.w = Array([1, 2, 1],dtype=self.dtype)
# All the answers are stored here to make it easier to read in the actual tests.
# Again, it makes a difference whether they are complex or real valued, so there
# are four sets of possible answers, depending on the dtypes.
if self.kind == 'real' and self.okind == 'real':
self.cumsum=self.type([5,8,9],dtype=self.dtype,**self.kwds)
self.mul = self.type([50, 24, 6],dtype=self.result_dtype,**self.kwds)
self.mul_s = self.type([25, 15, 5],dtype=self.result_dtype,**self.kwds)
self.add = self.type([15, 11, 7],dtype=self.result_dtype,**self.kwds)
self.add_s = self.type([10, 8, 6],dtype=self.result_dtype,**self.kwds)
#self.div = [.5, 3./8., 1./6.]
self.div = self.type([.5, 0.375, .16666666666666666667],dtype=self.result_dtype,**self.kwds)
#self.div_s = [1., 3./5., 1./5.]
def test_fwd_real_arr(self):
for fwd_dtype in [float32,float64]:
# Even input
inarr = ar(self.in_r2c_e,dtype=fwd_dtype)
outexp = ar(self.out_r2c_e,dtype=_other_kind[fwd_dtype])
_test_fft(self,inarr,outexp,self.tdict[fwd_dtype])
# Odd input
inarr = ar(self.in_r2c_o,dtype=fwd_dtype)
outexp = ar(self.out_r2c_o,dtype=_other_kind[fwd_dtype])
_test_fft(self,inarr,outexp,self.tdict[fwd_dtype])
# Random
rand_inarr = ar(zeros(self.rand_len_r,dtype=fwd_dtype))
rand_outarr = ar(zeros(self.rand_len_c,dtype=_other_kind[fwd_dtype]))
_test_random(self,rand_inarr,rand_outarr,self.tdict[fwd_dtype])
# Clean these up since they could be big:
del rand_inarr
del rand_outarr
# Check that exceptions are raised. Need input and
# output arrays; just reuse inarr and outexp (values won't
# matter, we're just checking exceptions).
_test_raise_excep_fft(self,inarr,outexp)
"""
# don't waste time trying to optimize a single FFT
pycbc.fft.fftw.set_measure_level(0)
if high_freq_cutoff:
strain = resample_to_delta_t(strain, 0.5 / high_freq_cutoff,
method='ldas')
else:
strain = strain.copy()
# taper strain
corrupt_length = int(corrupt_time * strain.sample_rate)
w = numpy.arange(corrupt_length) / float(corrupt_length)
strain[0:corrupt_length] *= pycbc.types.Array(w, dtype=strain.dtype)
strain[(len(strain) - corrupt_length):] *= \
pycbc.types.Array(w[::-1], dtype=strain.dtype)
if output_intermediates:
strain.save_to_wav('strain_conditioned.wav')
# zero-pad strain to a power-of-2 length
strain_pad_length = next_power_of_2(len(strain))
pad_start = int(strain_pad_length / 2 - len(strain) / 2)
pad_end = pad_start + len(strain)
pad_epoch = strain.start_time - pad_start / float(strain.sample_rate)
strain_pad = pycbc.types.TimeSeries(
pycbc.types.zeros(strain_pad_length, dtype=strain.dtype),
delta_t=strain.delta_t, copy=False, epoch=pad_epoch)
strain_pad[pad_start:pad_end] = strain[:]
# estimate the PSD
psd = pycbc.psd.welch(strain[corrupt_length:(len(strain)-corrupt_length)],
output_intermediates : {bool, False}
Save intermediate time series for debugging.
"""
# don't waste time trying to optimize a single FFT
pycbc.fft.fftw.set_measure_level(0)
if high_freq_cutoff:
strain = resample_to_delta_t(strain, 0.5 / high_freq_cutoff,
method='ldas')
else:
strain = strain.copy()
# taper strain
corrupt_length = int(corrupt_time * strain.sample_rate)
w = numpy.arange(corrupt_length) / float(corrupt_length)
strain[0:corrupt_length] *= pycbc.types.Array(w, dtype=strain.dtype)
strain[(len(strain) - corrupt_length):] *= \
pycbc.types.Array(w[::-1], dtype=strain.dtype)
if output_intermediates:
strain.save_to_wav('strain_conditioned.wav')
# zero-pad strain to a power-of-2 length
strain_pad_length = next_power_of_2(len(strain))
pad_start = int(strain_pad_length / 2 - len(strain) / 2)
pad_end = pad_start + len(strain)
pad_epoch = strain.start_time - pad_start / float(strain.sample_rate)
strain_pad = pycbc.types.TimeSeries(
pycbc.types.zeros(strain_pad_length, dtype=strain.dtype),
delta_t=strain.delta_t, copy=False, epoch=pad_epoch)
strain_pad[pad_start:pad_end] = strain[:]
The beta parameter to use for the Kaiser window. See
``scipy.signal.kaiser`` for details. Default is 8.
side : {'left', 'right'}
The side to apply the taper to. If ``'left'`` (``'right'``), the taper
will roll up (down) between ``start`` and ``end``, with all values
before ``start`` (after ``end``) set to zero. Default is ``'left'``.
Returns
-------
FrequencySeries
The tapered frequency series.
"""
out = out.copy()
width = end - start
winlen = 2 * int(width / out.delta_f)
window = Array(signal.get_window(('kaiser', beta), winlen))
kmin = int(start / out.delta_f)
kmax = kmin + winlen//2
if side == 'left':
out[kmin:kmax] *= window[:winlen//2]
out[:kmin] *= 0.
elif side == 'right':
out[kmin:kmax] *= window[winlen//2:]
out[kmax:] *= 0.
else:
raise ValueError("unrecognized side argument {}".format(side))
return out
def _check_fft_args(invec, outvec):
if not isinstance(invec,_Array):
raise TypeError("Input is not a PyCBC Array")
if not isinstance(outvec,_Array):
raise TypeError("Output is not a PyCBC Array")
if isinstance(invec,_TimeSeries) and not isinstance(
outvec,_FrequencySeries):
raise TypeError(
"When input is TimeSeries output must be FrequencySeries")
if isinstance(outvec,_TimeSeries) and not isinstance(
invec,_FrequencySeries):
raise TypeError(
"When output is TimeSeries input must be FrequencySeries")
if isinstance(invec,_FrequencySeries) and not isinstance(
outvec,_TimeSeries):
raise TypeError(
"When input is FrequencySeries output must be TimeSeries")
bank_match = tmplt_bank_matches[i]
if (abs(bank_match) > 0.99):
# Not much point calculating bank_chisquared if the bank template
# is very close to the filter template. Can also hit numerical
# error due to approximations made in this calculation.
# The value of 2 is the expected addition to the chisq for this
# template
bank_chisq += 2.
continue
bank_norm = sqrt((1 - bank_match*bank_match.conj()).real)
bank_SNR = bank_snrs[i] * (bank_norms[i] / bank_norm)
tmplt_SNR = tmplt_snr * (bank_match.conj() * tmplt_norm / bank_norm)
bank_SNR = Array(bank_SNR, copy=False)
tmplt_SNR = Array(tmplt_SNR, copy=False)
bank_chisq += (bank_SNR - tmplt_SNR).squared_norm()
if indices is not None:
return bank_chisq
else:
return TimeSeries(bank_chisq, delta_t=tmplt_snr.delta_t,
epoch=tmplt_snr.start_time, copy=False)