Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _make_buffers(self):
"""
Allocate memory for internal buffers according to FFT size, number of
channels, and number of frames.
"""
# state variables
self.n_state = self.num_samples - self.hop
self.n_state_out = self.nfft - self.hop
# make DFT object
self.dft = DFT(nfft=self.nfft,
D=self.num_channels,
analysis_window=self.analysis_window,
synthesis_window=self.synthesis_window,
transform=self.transform)
"""
1D array for num_channels=1 as the FFTW package can only take 1D array
for 1D DFT.
"""
if self.mono:
# input buffer
self.fft_in_buffer = np.zeros(self.nfft, dtype=self.time_dtype)
# state buffer
self.x_p = np.zeros(self.n_state, dtype=self.time_dtype)
# prev reconstructed samples
self.y_p = np.zeros(self.n_state_out, dtype=self.time_dtype)
# output samples
# if fixed number of frames to process
if self.fixed_input:
if self.num_frames == 0:
if self.mono:
self.X = np.zeros(self.nbin, dtype=self.freq_dtype)
else:
self.X = np.zeros((self.nbin, self.num_channels),
dtype=self.freq_dtype)
else:
self.X = np.squeeze(np.zeros((self.num_frames,
self.nbin,
self.num_channels),
dtype=self.freq_dtype))
# DFT object for multiple frames
self.dft_frames = DFT(nfft=self.nfft, D=self.num_frames,
analysis_window=self.analysis_window,
synthesis_window=self.synthesis_window,
transform=self.transform)
else: # we will allocate these on-the-fly
self.X = None
self.dft_frames = None
n = 0
for f in range(self.num_frames):
# apply IDFT to current frame and reconstruct output
x_r[n:n+self.hop, ] = self._overlap_and_add(
self.dft.synthesis(self.X[f, :, :]))
n += self.hop
else:
x_r = np.zeros(self.num_frames*self.hop, dtype=self.time_dtype)
# treat number of frames as the multiple channels for DFT
if not self.fixed_input:
self.dft_frames = DFT(nfft=self.nfft,
D=self.num_frames,
analysis_window=self.analysis_window,
synthesis_window=self.synthesis_window,
transform=self.transform)
# back to time domain
mx = self.dft_frames.synthesis(self.X.T)
# overlap and add
n = 0
for f in range(self.num_frames):
x_r[n:n+self.hop, ] = self._overlap_and_add(mx[:, f])
n += self.hop
return x_r
self.num_samples)-x_shape[0]
if extra_samples:
if self.mono:
x = np.concatenate((x, np.zeros(extra_samples)))
else:
x = np.concatenate(
(x,
np.zeros((extra_samples, self.num_channels))))
# ----allocate memory if necessary
if not self.fixed_input:
self.X = np.squeeze(np.zeros((self.num_frames,
self.nbin,
self.num_channels),
dtype=self.freq_dtype))
self.dft_frames = DFT(nfft=self.nfft,
D=self.num_frames,
analysis_window=self.analysis_window,
synthesis_window=self.synthesis_window,
transform=self.transform)
# ----use appropriate function
if self.streaming:
self._analysis_streaming(x)
else:
self.reset()
self._analysis_non_streaming(x)
return self.X