Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_fwd_real_ts(self):
for fwd_dtype in [float32,float64]:
delta_t = self.delta
# Even input
inarr = ts(self.in_r2c_e,dtype=fwd_dtype,delta_t=delta_t,epoch=self.epoch)
delta_f = 1.0/(inarr.delta_t * len(inarr))
outexp = fs(self.out_r2c_e,dtype=_other_kind[fwd_dtype],delta_f=delta_f,epoch=self.epoch)
outexp *= delta_t
_test_fft(self,inarr,outexp,self.tdict[fwd_dtype])
# Odd input
inarr = ts(self.in_r2c_o,dtype=fwd_dtype,delta_t=delta_t,epoch=self.epoch)
delta_f = 1.0/(inarr.delta_t * len(inarr))
outexp = fs(self.out_r2c_o,dtype=_other_kind[fwd_dtype],delta_f=delta_f,epoch=self.epoch)
outexp *= delta_t
_test_fft(self,inarr,outexp,self.tdict[fwd_dtype])
# Random
rand_inarr = ts(zeros(self.rand_len_r,dtype=fwd_dtype),epoch=self.epoch,delta_t=delta_t)
delta_f = 1.0/(rand_inarr.delta_t * len(rand_inarr))
rand_outarr = fs(zeros(self.rand_len_c,dtype=_other_kind[fwd_dtype]),epoch=self.epoch,
delta_f=delta_f)
_test_random(self,rand_inarr,rand_outarr,self.tdict[fwd_dtype])
def ourcopy(other):
"""
A convenience function to return an exact copy of a pycbc.type instance.
"""
if not isinstance(other,pycbc.types.Array):
raise TypeError("ourcopy() can only be used to duplicate PyCBC types")
if isinstance(other,pycbc.types.TimeSeries):
return pycbc.types.TimeSeries(initial_array=other._data,dtype=other.dtype,
delta_t=other._delta_t,epoch=other._epoch,copy=True)
if isinstance(other,pycbc.types.FrequencySeries):
return pycbc.types.FrequencySeries(initial_array=other._data,dtype=other.dtype,
delta_f=other._delta_f,epoch=other._epoch,copy=True)
if isinstance(other,pycbc.types.Array):
return pycbc.types.Array(initial_array=other._data,dtype=other.dtype,copy=True)
def test_fwd_real_fs(self):
for fwd_dtype in [float32,float64]:
delta_f = self.delta
# Even input
inarr = fs(self.in_r2c_e,dtype=fwd_dtype,delta_f=delta_f,epoch=self.epoch)
delta_t = 1.0/(inarr.delta_f * len(inarr))
outexp = ts(self.out_r2c_e,dtype=_other_kind[fwd_dtype],delta_t=delta_t,epoch=self.epoch)
outexp *= delta_f
_test_fft(self,inarr,outexp,self.tdict[fwd_dtype])
# Odd input
inarr = fs(self.in_r2c_o,dtype=fwd_dtype,delta_f=delta_f,epoch=self.epoch)
delta_t = 1.0/(inarr.delta_f * len(inarr))
outexp = ts(self.out_r2c_o,dtype=_other_kind[fwd_dtype],delta_t=delta_t,epoch=self.epoch)
outexp *= delta_f
_test_fft(self,inarr,outexp,self.tdict[fwd_dtype])
# Random
rand_inarr = fs(zeros(self.rand_len_r,dtype=fwd_dtype),epoch=self.epoch,delta_f=delta_f)
delta_t = 1.0/(rand_inarr.delta_f * len(rand_inarr))
rand_outarr = ts(zeros(self.rand_len_c,dtype=_other_kind[fwd_dtype]),epoch=self.epoch,
delta_t=delta_t)
_test_random(self,rand_inarr,rand_outarr,self.tdict[fwd_dtype])
# LAL doesn't have forward FFT funcs starting from a FS, so skip _test_lal
# Clean these up since they could be big:
def test_rev_complex_ts(self):
for rev_dtype in [complex64,complex128]:
delta_t = self.delta
# Don't do separate even/odd tests for complex
inarr = ts(self.in_c2c_rev,dtype=rev_dtype,delta_t=delta_t,epoch=self.epoch)
delta_f = 1.0/(delta_t*len(self.out_c2c_rev))
outexp = fs(self.out_c2c_rev,dtype=rev_dtype,delta_f=delta_f,epoch=self.epoch)
outexp *= delta_t
_test_ifft(self,inarr,outexp,self.tdict[rev_dtype])
# Random---we don't do that in 'reverse' tests, since both
# directions are already tested in forward, and if we just passed
# in arrays in the other order we'd only get exceptions
#
# LAL doesn't have reverse FFT funcs starting from a TimeSeries, so
# we skip those tests as well.
#
# Check that exceptions are raised. Need input and
# output arrays; just reuse inarr and outexp (values won't
# matter, we're just checking exceptions).
output_args = {"delta_f": self.delta, "epoch": self.epoch}
_test_raise_excep_ifft(self,inarr,outexp,output_args)
def test_fwd_complex_ts(self):
for fwd_dtype in [complex64,complex128]:
delta_t = self.delta
# Don't do separate even/odd tests for complex
inarr = ts(self.in_c2c_fwd,dtype=fwd_dtype,delta_t=delta_t,epoch=self.epoch)
delta_f = 1.0/(delta_t * len(inarr))
outexp = fs(self.out_c2c_fwd,dtype=fwd_dtype,delta_f=delta_f,epoch=self.epoch)
outexp *= delta_t
_test_fft(self,inarr,outexp,self.tdict[fwd_dtype])
# Random
rand_inarr = ts(zeros(self.rand_len_c,dtype=fwd_dtype),delta_t=delta_t,epoch=self.epoch)
delta_f = 1.0/(delta_t*len(rand_inarr))
rand_outarr = fs(zeros(self.rand_len_c,dtype=fwd_dtype),delta_f=delta_f,epoch=self.epoch)
_test_random(self,rand_inarr,rand_outarr,self.tdict[fwd_dtype])
# Reuse random arrays for the LAL tests:
# COMMENTED OUT: The LAL Complex TimeFreqFFT and FreqTimeFFT functions perform
# a repacking of data because they seem to assume that the array represents both
# positive and negative frequencies. We don't do this, so we don't compare.
#_test_lal_tf_fft(self,rand_inarr,rand_outarr,self.tdict[fwd_dtype])
# Clean these up since they could be big:
del rand_inarr
delta_t : float
The time step of the data series if it is not a TimeSeries instance.
unbiased : bool
If True the normalization of the autocovariance function is n-k
instead of n. This is called the unbiased estimation of the
autocovariance. Note that this does not mean the ACF is unbiased.
Returns
-------
acf : numpy.array
If data is a TimeSeries then acf will be a TimeSeries of the
one-sided ACF. Else acf is a numpy.array.
"""
# if given a TimeSeries instance then get numpy.array
if isinstance(data, TimeSeries):
y = data.numpy()
delta_t = data.delta_t
else:
y = data
# Zero mean
y = y - y.mean()
ny_orig = len(y)
npad = 1
while npad < 2*ny_orig:
npad = npad << 1
ypad = numpy.zeros(npad)
ypad[:ny_orig] = y
# FFT data minus the mean
"""
if not delta_t:
delta_t = lm_deltat(freqs, damping_times)
if not t_final:
t_final = lm_tfinal(damping_times)
kmax = int(t_final / delta_t) + 1
# Different modes will have different tapering window-size
# Find maximum window size to create long enough output vector
if taper:
max_tau = max(damping_times.values()) if \
isinstance(damping_times, dict) else damping_times
kmax += int(max_tau/delta_t)
outplus = TimeSeries(zeros(kmax, dtype=float64), delta_t=delta_t)
outcross = TimeSeries(zeros(kmax, dtype=float64), delta_t=delta_t)
if taper:
# Change epoch of output vector if tapering will be applied
start = - max_tau
# To ensure that t=0 is still in output vector
start -= start % delta_t
outplus._epoch, outcross._epoch = start, start
return outplus, outcross
# taper strain
corrupt_length = int(corrupt_time * strain.sample_rate)
w = numpy.arange(corrupt_length) / float(corrupt_length)
strain[0:corrupt_length] *= pycbc.types.Array(w, dtype=strain.dtype)
strain[(len(strain) - corrupt_length):] *= \
pycbc.types.Array(w[::-1], dtype=strain.dtype)
if output_intermediates:
strain.save_to_wav('strain_conditioned.wav')
# zero-pad strain to a power-of-2 length
strain_pad_length = next_power_of_2(len(strain))
pad_start = int(strain_pad_length / 2 - len(strain) / 2)
pad_end = pad_start + len(strain)
pad_epoch = strain.start_time - pad_start / float(strain.sample_rate)
strain_pad = pycbc.types.TimeSeries(
pycbc.types.zeros(strain_pad_length, dtype=strain.dtype),
delta_t=strain.delta_t, copy=False, epoch=pad_epoch)
strain_pad[pad_start:pad_end] = strain[:]
# estimate the PSD
psd = pycbc.psd.welch(strain[corrupt_length:(len(strain)-corrupt_length)],
seg_len=int(psd_duration * strain.sample_rate),
seg_stride=int(psd_stride * strain.sample_rate),
avg_method=psd_avg_method,
require_exact_data_fit=False)
psd = pycbc.psd.interpolate(psd, 1. / strain_pad.duration)
psd = pycbc.psd.inverse_spectrum_truncation(
psd, int(psd_duration * strain.sample_rate),
low_frequency_cutoff=low_freq_cutoff,
trunc_method='hann')
kmin = int(low_freq_cutoff / psd.delta_f)
import pylab
from pycbc import types, fft, waveform
# Get a time domain waveform
hp, hc = waveform.get_td_waveform(approximant="EOBNRv2",
mass1=6, mass2=6, delta_t=1.0/4096, f_lower=40)
# Get a frequency domain waveform
sptilde, sctilde = waveform. get_fd_waveform(approximant="TaylorF2",
mass1=6, mass2=6, delta_f=1.0/4, f_lower=40)
# FFT it to the time-domain
tlen = int(1.0 / hp.delta_t / sptilde.delta_f)
sptilde.resize(tlen/2 + 1)
sp = types.TimeSeries(types.zeros(tlen), delta_t=hp.delta_t)
fft.ifft(sptilde, sp)
pylab.plot(sp.sample_times, sp, label="TaylorF2 (IFFT)")
pylab.plot(hp.sample_times, hp, label="EOBNRv2")
pylab.ylabel('Strain')
pylab.xlabel('Time (s)')
pylab.legend()
pylab.show()