Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
Prepare models to make sure the coregionalized model with diagonal coregion kernel and
with fixed lengthscale is equivalent with normal GP regression.
"""
# 1. Two independent VGPs for two sets of data
k0 = gpflow.kernels.SquaredExponential()
k0.lengthscale.trainable = False
k1 = gpflow.kernels.SquaredExponential()
k1.lengthscale.trainable = False
vgp0 = gpflow.models.VGP((Datum.X[0], Datum.Y[0]),
kernel=k0,
mean_function=Constant(),
likelihood=gpflow.likelihoods.Gaussian(), num_latent=1)
vgp1 = gpflow.models.VGP((Datum.X[1], Datum.Y[1]),
kernel=k1,
mean_function=Constant(),
likelihood=gpflow.likelihoods.Gaussian(), num_latent=1)
# 2. Coregionalized GPR
kc = gpflow.kernels.SquaredExponential(active_dims=[0, 1])
kc.lengthscale.trainable = False
kc.variance.trainable = False # variance is handles by the coregion kernel
coreg = gpflow.kernels.Coregion(output_dim=2, rank=1, active_dims=[2])
coreg.W.trainable = False
lik = gpflow.likelihoods.SwitchedLikelihood([gpflow.likelihoods.Gaussian(),
gpflow.likelihoods.Gaussian()]
)
mean_c = gpflow.mean_functions.SwitchedMeanFunction(
[gpflow.mean_functions.Constant(), gpflow.mean_functions.Constant()])
cvgp = gpflow.models.VGP((Datum.X_augumented, Datum.Y_augumented),
kernel=kc * coreg,
mean_function=mean_c,
likelihood=lik,
likelihood=gpflow.likelihoods.Gaussian(), num_latent=1)
vgp1 = gpflow.models.VGP((Datum.X[1], Datum.Y[1]),
kernel=k1,
mean_function=Constant(),
likelihood=gpflow.likelihoods.Gaussian(), num_latent=1)
# 2. Coregionalized GPR
kc = gpflow.kernels.SquaredExponential(active_dims=[0, 1])
kc.lengthscale.trainable = False
kc.variance.trainable = False # variance is handles by the coregion kernel
coreg = gpflow.kernels.Coregion(output_dim=2, rank=1, active_dims=[2])
coreg.W.trainable = False
lik = gpflow.likelihoods.SwitchedLikelihood([gpflow.likelihoods.Gaussian(),
gpflow.likelihoods.Gaussian()]
)
mean_c = gpflow.mean_functions.SwitchedMeanFunction(
[gpflow.mean_functions.Constant(), gpflow.mean_functions.Constant()])
cvgp = gpflow.models.VGP((Datum.X_augumented, Datum.Y_augumented),
kernel=kc * coreg,
mean_function=mean_c,
likelihood=lik,
num_latent=1
)
# Train them for a small number of iterations
opt = gpflow.optimizers.Scipy()
@tf.function(autograph=False)
def vgp0_closure():
return - vgp0.log_marginal_likelihood()
@tf.function(autograph=False)
X[1], Y[1], kern=k1,
mean_function=gpflow.mean_functions.Constant(),
likelihood=gpflow.likelihoods.Gaussian())
# 2. Coregionalized GPR
lik = gpflow.likelihoods.SwitchedLikelihood(
[gpflow.likelihoods.Gaussian(), gpflow.likelihoods.Gaussian()])
kc = gpflow.kernels.RBF(2)
kc.trainable = False # lengthscale and variance is fixed.
coreg = gpflow.kernels.Coregion(1, output_dim=2, rank=1, active_dims=[2])
coreg.W.trainable = False
mean_c = gpflow.mean_functions.SwitchedMeanFunction(
[gpflow.mean_functions.Constant(), gpflow.mean_functions.Constant()])
cvgp = gpflow.models.VGP(
X_augumented, Y_augumented,
kern=kc * coreg,
mean_function=mean_c,
likelihood=lik,
num_latent=2)
return vgp0, vgp1, cvgp, Xtest
Test for upper bound for regression marginal likelihood
"""
model_vfe = gpflow.models.SGPR((DatumUpper.X, DatumUpper.Y), gpflow.kernels.SquaredExponential(),
inducing_variable=DatumUpper.X[:10, :].copy(),
mean_function=Constant())
opt = gpflow.optimizers.Scipy()
@tf.function(autograph=False)
def model_vfe_closure():
return - model_vfe.log_marginal_likelihood()
opt.minimize(model_vfe_closure, variables=model_vfe.trainable_variables, options=dict(maxiter=500))
full_gp = gpflow.models.GPR((DatumUpper.X, DatumUpper.Y),
kernel=gpflow.kernels.SquaredExponential(),
mean_function=Constant())
full_gp.kernel.lengthscale.assign(model_vfe.kernel.lengthscale)
full_gp.kernel.variance.assign(model_vfe.kernel.variance)
full_gp.likelihood.variance.assign(model_vfe.likelihood.variance)
full_gp.mean_function.c.assign(model_vfe.mean_function.c)
lml_upper = model_vfe.upper_bound()
lml_vfe = model_vfe.log_marginal_likelihood()
lml_full_gp = full_gp.log_marginal_likelihood()
assert lml_vfe < lml_full_gp
assert lml_full_gp < lml_upper
def test_switched_mean_function(N, D):
"""
Test for the SwitchedMeanFunction.
"""
X = np.hstack([rng.randn(N, D), 1.0 * rng.randint(0, 2, N).reshape(-1, 1)])
zeros, ones = Constant(np.zeros(1)), Constant(np.ones(1))
switched_mean = SwitchedMeanFunction([zeros, ones])
np_list = np.array([0., 1.])
result_ref = (np_list[X[:, D].astype(default_int())]).reshape(-1, 1)
result = switched_mean(X)
assert_allclose(result, result_ref)
def markov_gauss():
cov_params = rng.randn(num_data + 1, D_in, 2 * D_in) / 2. # (N+1)xDx2D
Xcov = cov_params @ np.transpose(cov_params, (0, 2, 1)) # (N+1)xDxD
Xcross = cov_params[:-1] @ np.transpose(cov_params[1:], (0, 2, 1)) # NxDxD
Xcross = np.concatenate((Xcross, np.zeros((1, D_in, D_in))),
0) # (N+1)xDxD
Xcov = np.stack([Xcov, Xcross]) # 2x(N+1)xDxD
return MarkovGaussian(Xmu_markov, ctt(Xcov))
_means = {
'lin': mf.Linear(A=rng.randn(D_in, D_out), b=rng.randn(D_out)),
'identity': mf.Identity(input_dim=D_in),
'const': mf.Constant(c=rng.randn(D_out)),
'zero': mf.Zero(output_dim=D_out)
}
_distrs = {
'gauss':
Gaussian(Xmu, Xcov),
'dirac_gauss':
Gaussian(Xmu, np.zeros((num_data, D_in, D_in))),
'gauss_diag':
DiagonalGaussian(Xmu, rng.rand(num_data, D_in)),
'dirac_diag':
DiagonalGaussian(Xmu, np.zeros((num_data, D_in))),
'dirac_markov_gauss':
MarkovGaussian(Xmu_markov, np.zeros((2, num_data + 1, D_in, D_in))),
'markov_gauss':
markov_gauss()
mean_function=gpflow.mean_functions.Constant())
m2 = gpflow.models.VGP(
X, Y, gpflow.kernels.SquaredExponential(1), likelihood=gpflow.likelihoods.Gaussian(),
mean_function=gpflow.mean_functions.Constant())
m3 = gpflow.models.SVGP(
X, Y, gpflow.kernels.SquaredExponential(1),
likelihood=gpflow.likelihoods.Gaussian(),
Z=X.copy(),
q_diag=False,
mean_function=gpflow.mean_functions.Constant())
m3.inducing_variables.trainable = False
m4 = gpflow.models.SVGP(
X, Y, gpflow.kernels.SquaredExponential(1),
likelihood=gpflow.likelihoods.Gaussian(),
Z=X.copy(), q_diag=False, whiten=True,
mean_function=gpflow.mean_functions.Constant())
m4.inducing_variables.trainable = False
m5 = gpflow.models.SGPR(
X, Y, gpflow.kernels.SquaredExponential(1),
Z=X.copy(),
mean_function=gpflow.mean_functions.Constant())
m5.inducing_variables.trainable = False
m6 = gpflow.models.GPRFITC(
X, Y, gpflow.kernels.SquaredExponential(1), Z=X.copy(),
mean_function=gpflow.mean_functions.Constant())
m6.inducing_variables.trainable = False
return [m1, m2, m3, m4, m5, m6]
def _create_approximate_models():
"""
1) Variational GP (with the likelihood set to Gaussian)
2) Sparse variational GP (likelihood is Gaussian, inducing points
at the data)
3) Sparse variational GP (as above, but with the whitening rotation
of the inducing variables)
4) Sparse variational GP Regression (as above, but there the inducing
variables are 'collapsed' out, as in Titsias 2009)
5) FITC Sparse GP Regression
"""
model_1 = gpflow.models.VGP((Datum.X, Datum.Y),
gpflow.kernels.SquaredExponential(),
likelihood=gpflow.likelihoods.Gaussian(),
mean_function=gpflow.mean_functions.Constant())
model_2 = gpflow.models.SVGP(gpflow.kernels.SquaredExponential(),
gpflow.likelihoods.Gaussian(),
inducing_variable=Datum.X.copy(),
q_diag=False,
mean_function=gpflow.mean_functions.Constant(),
num_latent=Datum.Y.shape[1])
gpflow.utilities.set_trainable(model_2.inducing_variable, False)
model_3 = gpflow.models.SVGP(kernel=gpflow.kernels.SquaredExponential(),
likelihood=gpflow.likelihoods.Gaussian(),
inducing_variable=Datum.X.copy(), q_diag=False, whiten=True,
mean_function=gpflow.mean_functions.Constant(),
num_latent=Datum.Y.shape[1])
gpflow.utilities.set_trainable(model_3.inducing_variable, False)
model_4 = gpflow.models.GPRFITC((Datum.X, Datum.Y),
kernel=gpflow.kernels.SquaredExponential(),
inducing_variable=Datum.X.copy(),
def mean_function_factory(rng, mean_function_name, D_in, D_out):
if mean_function_name == "Zero":
return gpflow.mean_functions.Zero(output_dim=D_out)
elif mean_function_name == "Constant":
return gpflow.mean_functions.Constant(c=rng.rand(D_out))
elif mean_function_name == "Linear":
return gpflow.mean_functions.Linear(
A=rng.rand(D_in, D_out), b=rng.rand(D_out))
else:
return None
Y = np.sin(X) + 0.9 * np.cos(X * 1.6) + rng.randn(*X.shape) * 0.8
Y = np.tile(Y, 2) # two identical columns
self.Xtest = rng.rand(10, 1) * 10
m1 = gpflow.models.GPR(
X, Y, kern=gpflow.kernels.RBF(1),
mean_function=gpflow.mean_functions.Constant())
m2 = gpflow.models.VGP(
X, Y, gpflow.kernels.RBF(1), likelihood=gpflow.likelihoods.Gaussian(),
mean_function=gpflow.mean_functions.Constant())
m3 = gpflow.models.SVGP(
X, Y, gpflow.kernels.RBF(1),
likelihood=gpflow.likelihoods.Gaussian(),
Z=X.copy(),
q_diag=False,
mean_function=gpflow.mean_functions.Constant())
m3.feature.trainable = False
m4 = gpflow.models.SVGP(
X, Y, gpflow.kernels.RBF(1),
likelihood=gpflow.likelihoods.Gaussian(),
Z=X.copy(), q_diag=False, whiten=True,
mean_function=gpflow.mean_functions.Constant())
m4.feature.trainable = False
m5 = gpflow.models.SGPR(
X, Y, gpflow.kernels.RBF(1),
Z=X.copy(),
mean_function=gpflow.mean_functions.Constant())
m5.feature.trainable = False
m6 = gpflow.models.GPRFITC(
X, Y, gpflow.kernels.RBF(1), Z=X.copy(),
mean_function=gpflow.mean_functions.Constant())