Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert (isinstance(ubm, Mixture) and ubm.validate()), "Second argument must be a proper Mixture"
assert (isinstance(nb_iter, int) and (0 < nb_iter)), "nb_iter must be a positive integer"
gmm_covariance = "diag" if ubm.invcov.ndim == 2 else "full"
# Set useful variables
with h5py.File(stat_server_filename[0], 'r') as fh: # open the first StatServer to get size
_, sv_size = fh['stat1'].shape
feature_size = fh['stat1'].shape[1] // fh['stat0'].shape[1]
distrib_nb = fh['stat0'].shape[1]
upper_triangle_indices = numpy.triu_indices(tv_rank)
# mean and Sigma are initialized at ZEROS as statistics are centered
self.mean = numpy.zeros(ubm.get_mean_super_vector().shape, dtype=STAT_TYPE)
self.F = serialize(numpy.zeros((sv_size, tv_rank)).astype(STAT_TYPE))
if tv_init is None:
self.F = numpy.random.randn(sv_size, tv_rank).astype(STAT_TYPE)
else:
self.F = tv_init
self.Sigma = numpy.zeros(ubm.get_mean_super_vector().shape, dtype=STAT_TYPE)
# Save init if required
if output_file_name is None:
output_file_name = "temporary_factor_analyser"
if save_init:
self.write(output_file_name + "_init.h5")
# Estimate TV iteratively
for it in range(nb_iter):
# Create serialized accumulators for the list of models to process
assert (isinstance(ubm, Mixture) and ubm.validate()), "Second argument must be a proper Mixture"
assert (isinstance(nb_iter, int) and (0 < nb_iter)), "nb_iter must be a positive integer"
gmm_covariance = "diag" if ubm.invcov.ndim == 2 else "full"
# Set useful variables
with h5py.File(stat_server_filename[0], 'r') as fh: # open the first StatServer to get size
_, sv_size = fh['stat1'].shape
feature_size = fh['stat1'].shape[1] // fh['stat0'].shape[1]
distrib_nb = fh['stat0'].shape[1]
upper_triangle_indices = numpy.triu_indices(tv_rank)
# mean and Sigma are initialized at ZEROS as statistics are centered
self.mean = numpy.zeros(ubm.get_mean_super_vector().shape, dtype=STAT_TYPE)
self.F = serialize(numpy.zeros((sv_size, tv_rank)).astype(STAT_TYPE))
if tv_init is None:
self.F = numpy.random.randn(sv_size, tv_rank).astype(STAT_TYPE)
else:
self.F = tv_init
self.Sigma = numpy.zeros(ubm.get_mean_super_vector().shape, dtype=STAT_TYPE)
# Save init if required
if output_file_name is None:
output_file_name = "temporary_factor_analyser"
if save_init:
self.write(output_file_name + "_init.h5")
# Estimate TV iteratively
for it in range(nb_iter):
:param mini_batch_indices: indices of the elements in the list (should start at zero)
:param factor_analyser: FactorAnalyser object
:param stat0: matrix of zero order statistics
:param stat1: matrix of first order statistics
:param e_h: accumulator
:param e_hh: accumulator
:param num_thread: number of parallel process to run
"""
rank = factor_analyser.F.shape[1]
if factor_analyser.Sigma.ndim == 2:
A = factor_analyser.F.T.dot(factor_analyser.F)
inv_lambda_unique = dict()
for sess in numpy.unique(stat0[:, 0]):
inv_lambda_unique[sess] = scipy.linalg.inv(sess * A + numpy.eye(A.shape[0]))
tmp = numpy.zeros((factor_analyser.F.shape[1], factor_analyser.F.shape[1]), dtype=STAT_TYPE)
for idx in mini_batch_indices:
if factor_analyser.Sigma.ndim == 1:
inv_lambda = scipy.linalg.inv(numpy.eye(rank) +
(factor_analyser.F.T * stat0[idx + batch_start, :]).dot(factor_analyser.F))
else:
inv_lambda = inv_lambda_unique[stat0[idx + batch_start, 0]]
aux = factor_analyser.F.T.dot(stat1[idx + batch_start, :])
numpy.dot(aux, inv_lambda, out=e_h[idx])
e_hh[idx] = inv_lambda + numpy.outer(e_h[idx], e_h[idx], tmp)