Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Prepare models to make sure the coregionalized model with diagonal coregion kernel and
with fixed lengthscale is equivalent with normal GP regression.
"""
# 1. Two independent VGPs for two sets of data
k0 = gpflow.kernels.SquaredExponential()
k0.lengthscale.trainable = False
k1 = gpflow.kernels.SquaredExponential()
k1.lengthscale.trainable = False
vgp0 = gpflow.models.VGP((Datum.X[0], Datum.Y[0]),
kernel=k0,
mean_function=Constant(),
likelihood=gpflow.likelihoods.Gaussian(), num_latent=1)
vgp1 = gpflow.models.VGP((Datum.X[1], Datum.Y[1]),
kernel=k1,
mean_function=Constant(),
likelihood=gpflow.likelihoods.Gaussian(), num_latent=1)
# 2. Coregionalized GPR
kc = gpflow.kernels.SquaredExponential(active_dims=[0, 1])
kc.lengthscale.trainable = False
kc.variance.trainable = False # variance is handles by the coregion kernel
coreg = gpflow.kernels.Coregion(output_dim=2, rank=1, active_dims=[2])
coreg.W.trainable = False
lik = gpflow.likelihoods.SwitchedLikelihood([gpflow.likelihoods.Gaussian(),
gpflow.likelihoods.Gaussian()]
)
mean_c = gpflow.mean_functions.SwitchedMeanFunction(
[gpflow.mean_functions.Constant(), gpflow.mean_functions.Constant()])
cvgp = gpflow.models.VGP((Datum.X_augumented, Datum.Y_augumented),
kernel=kc * coreg,
mean_function=mean_c,
likelihood=lik,
num_latent=1
def test_vgp(self):
with self.test_session():
domain = gpflowopt.domain.UnitCube(2)
X = gpflowopt.design.RandomDesign(10, domain).generate()
Y = np.sin(X[:,[0]])
m = gpflow.vgp.VGP(X, Y, gpflow.kernels.RBF(2), gpflow.likelihoods.Gaussian())
acq = gpflowopt.acquisition.ExpectedImprovement(m)
m.compile()
self.assertFalse(m._needs_recompile)
acq.evaluate(gpflowopt.design.RandomDesign(10, domain).generate())
self.assertTrue(hasattr(acq, '_evaluate_AF_storage'))
Xnew = gpflowopt.design.RandomDesign(5, domain).generate()
Ynew = np.sin(Xnew[:,[0]])
acq.set_data(np.vstack((X, Xnew)), np.vstack((Y, Ynew)))
self.assertFalse(hasattr(acq, '_needs_recompile'))
self.assertFalse(hasattr(acq, '_evaluate_AF_storage'))
acq.evaluate(gpflowopt.design.RandomDesign(10, domain).generate())
def test_VGP_vs_GPR(session_tf):
"""
With a Gaussian likelihood the Gaussian variational (VGP) model should be equivalent to the exact
regression model (GPR) after a single nat grad step of size 1
"""
N, D = 3, 2
X = np.random.randn(N, D)
Y = np.random.randn(N, 1)
kern = gpflow.kernels.RBF(D)
lik_var = 0.1
lik = gpflow.likelihoods.Gaussian()
lik.variance = lik_var
m_vgp = gpflow.models.VGP(X, Y, kern, lik)
m_gpr = gpflow.models.GPR(X, Y, kern)
m_gpr.likelihood.variance = lik_var
m_vgp.set_trainable(False)
m_vgp.q_mu.set_trainable(True)
m_vgp.q_sqrt.set_trainable(True)
NatGradOptimizer(1.).minimize(m_vgp, [(m_vgp.q_mu, m_vgp.q_sqrt)], maxiter=1)
assert_allclose(m_gpr.compute_log_likelihood(),
m_vgp.compute_log_likelihood(), atol=1e-4)
label = [np.zeros((10, 1)), np.ones((20, 1))]
perm = list(range(30))
rng.shuffle(perm)
Xtest = rng.rand(10, 2) * 10
X_augumented = np.hstack([np.concatenate(X), np.concatenate(label)])
Y_augumented = np.hstack([np.concatenate(Y), np.concatenate(label)])
# 1. Two independent VGPs for two sets of data
k0 = gpflow.kernels.RBF(2)
k0.lengthscales.trainable = False
vgp0 = gpflow.models.VGP(
X[0], Y[0], kern=k0,
mean_function=gpflow.mean_functions.Constant(),
likelihood=gpflow.likelihoods.Gaussian())
k1 = gpflow.kernels.RBF(2)
k1.lengthscales.trainable = False
vgp1 = gpflow.models.VGP(
X[1], Y[1], kern=k1,
mean_function=gpflow.mean_functions.Constant(),
likelihood=gpflow.likelihoods.Gaussian())
# 2. Coregionalized GPR
lik = gpflow.likelihoods.SwitchedLikelihood(
[gpflow.likelihoods.Gaussian(), gpflow.likelihoods.Gaussian()])
kc = gpflow.kernels.RBF(2)
kc.trainable = False # lengthscale and variance is fixed.
coreg = gpflow.kernels.Coregion(1, output_dim=2, rank=1, active_dims=[2])
coreg.W.trainable = False
def test_switched_likelihood_predict_density(Y_list, F_list, Fvar_list, Y_label):
Y_perm = list(range(3 + 4 + 5))
np.random.shuffle(Y_perm)
# shuffle the original data
Y_sw = np.hstack([np.concatenate(Y_list), np.concatenate(Y_label)])[Y_perm, :3]
F_sw = np.concatenate(F_list)[Y_perm, :]
Fvar_sw = np.concatenate(Fvar_list)[Y_perm, :]
likelihoods = [Gaussian()] * 3
for lik in likelihoods:
lik.variance = np.exp(np.random.randn(1)).squeeze().astype(np.float32)
switched_likelihood = SwitchedLikelihood(likelihoods)
switched_results = switched_likelihood.predict_density(F_sw, Fvar_sw, Y_sw)
# likelihood
results = [lik.predict_density(f, fvar, y) for lik, y, f, fvar in zip(likelihoods, Y_list, F_list, Fvar_list)]
assert_allclose(switched_results, np.concatenate(results)[Y_perm, :])
k0.lengthscales.trainable = False
vgp0 = gpflow.models.VGP(
X[0], Y[0], kern=k0,
mean_function=gpflow.mean_functions.Constant(),
likelihood=gpflow.likelihoods.Gaussian())
k1 = gpflow.kernels.RBF(2)
k1.lengthscales.trainable = False
vgp1 = gpflow.models.VGP(
X[1], Y[1], kern=k1,
mean_function=gpflow.mean_functions.Constant(),
likelihood=gpflow.likelihoods.Gaussian())
# 2. Coregionalized GPR
lik = gpflow.likelihoods.SwitchedLikelihood(
[gpflow.likelihoods.Gaussian(), gpflow.likelihoods.Gaussian()])
kc = gpflow.kernels.RBF(2)
kc.trainable = False # lengthscale and variance is fixed.
coreg = gpflow.kernels.Coregion(1, output_dim=2, rank=1, active_dims=[2])
coreg.W.trainable = False
mean_c = gpflow.mean_functions.SwitchedMeanFunction(
[gpflow.mean_functions.Constant(), gpflow.mean_functions.Constant()])
cvgp = gpflow.models.VGP(
X_augumented, Y_augumented,
kern=kc * coreg,
mean_function=mean_c,
likelihood=lik,
num_latent=2)
return vgp0, vgp1, cvgp, Xtest
def test_gaussian_input_prop(self):
lik = Gaussian()
lik.variance = 0.01
N, Ns, D_Y = self.X.shape[0], self.Xs.shape[0], self.D_Y
Y = np.random.randn(N, D_Y)
Ys = np.random.randn(Ns, D_Y)
self.compare_to_single_layer(Y, Ys, lik,
init_layers_input_propagation)
def prepare(self):
rng = np.random.RandomState(0)
X = rng.randn(100, 2)
Y = rng.randn(100, 1)
Z = rng.randn(10, 2)
lik = gpflow.likelihoods.Gaussian()
kern = gpflow.kernels.Matern32(2)
Xs = rng.randn(10, 2)
# make one of each model
ms = []
#for M in (gpflow.models.GPMC, gpflow.models.VGP):
for M in (gpflow.models.VGP, gpflow.models.GPMC):
ms.append(M(X, Y, kern, lik))
for M in (gpflow.models.SGPMC, gpflow.models.SVGP):
ms.append(M(X, Y, kern, lik, Z))
ms.append(gpflow.models.GPR(X, Y, kern))
ms.append(gpflow.models.SGPR(X, Y, kern, Z=Z))
ms.append(gpflow.models.GPRFITC(X, Y, kern, Z=Z))
return ms, Xs, rng
def test_nongpr_model(self):
design = gpflowopt.design.LatinHyperCube(16, self.domain)
X, Y = design.generate(), parabola2d(design.generate())[0]
m = gpflow.vgp.VGP(X, Y, gpflow.kernels.RBF(2, ARD=True), likelihood=gpflow.likelihoods.Gaussian())
acq = gpflowopt.acquisition.ExpectedImprovement(m)
optimizer = gpflowopt.BayesianOptimizer(self.domain, acq)
result = optimizer.optimize(lambda X: parabola2d(X)[0], n_iter=1)
self.assertTrue(result.success)