Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
l3 = m2.compute_log_likelihood()
l4 = m3.compute_log_likelihood()
self.assertTrue(l1 == l2 == l3 == l4)
# make sure predictions still match (this tests AutoFlow)
pX = np.linspace(-3, 3, 10)[:, None]
p1, _ = self.m.predict_y(pX)
p2, _ = m1.predict_y(pX)
p3, _ = m2.predict_y(pX)
p4, _ = m3.predict_y(pX)
self.assertTrue(np.all(p1 == p2))
self.assertTrue(np.all(p1 == p3))
self.assertTrue(np.all(p1 == p4))
class TestPickleFix(GPflowTestCase):
"""
Make sure a kernel with a fixed parameter can be computed after pickling
"""
def test(self):
with self.test_context():
k = gpflow.kernels.PeriodicKernel(1)
k.period.fixed = True
k = pickle.loads(pickle.dumps(k))
x = np.linspace(0,1,100).reshape([-1,1])
k.compute_K(x, x)
class TestPickleSVGP(GPflowTestCase):
"""
Like the TestPickleGPR test, but with svgp (since it has extra tf variables
for minibatching)
"""import numpy as np
import pytest
import tensorflow as tf
import gpflow.kernels as kernels
rng = np.random.RandomState(0)
class Datum:
num_data = 100
D = 100
X = rng.rand(num_data, D) * 100
kernel_list = [kernels.Matern12(), kernels.Matern32(), kernels.Matern52(),
kernels.Exponential(), kernels.Cosine()]
@pytest.mark.parametrize('kernel', kernel_list)
def test_kernel_euclidean_distance(kernel):
'''
Tests output & gradients of kernels that are a function of the (scaled) euclidean distance
of the points. We test on a high dimensional space, which can generate very small distances
causing the scaled_square_dist to generate some negative values.
'''
K = kernel(Datum.X)
assert not np.isnan(K).any(), 'NaNs in the output of the ' + kernel.__name__ + 'kernel.'
assert np.isfinite(K).all(), 'Infs in the output of the ' + kernel.__name__ + ' kernel.'
X_as_param = tf.Variable(Datum.X)
with tf.GradientTape() as tape:def test_falls_back_to_default(self):
kern = gpflow.kernels.Matern52(10)
deriv_kern = gpflow.derivative_kernel.derivative_kernel_factory(10, 10, kern)
assert type(deriv_kern) == gpflow.derivative_kernel.DifferentialObservationsKernelDynamicdef _init_kernel(self, D, lengthscale, variance, period):
base = gpflow.kernels.Matern32(D, variance=variance, lengthscales=lengthscale)
return gpflow.kernels.Periodic(base=base, period=period)def test_nongpr_model(self, domain):
design = gpflowopt.design.LatinHyperCube(16, domain)
X, Y = design.generate(), parabola2d(design.generate())
m = gpflow.models.VGP(X, Y, gpflow.kernels.RBF(2, ARD=True), likelihood=gpflow.likelihoods.Gaussian())
acq = gpflowopt.acquisition.ExpectedImprovement(m)
optimizer = gpflowopt.BayesianOptimizer(domain, acq, optimizer=gpflowopt.optim.SciPyOptimizer(domain))
result = optimizer.optimize(lambda X: parabola2d(X), n_iter=1)
assert result.successdef test_deriv_rbf_kernel_x1_and_x2_different_lengthscales(self):
# this test is mainly about testing our rbf derivative kernel implementation
# when the lengthscales vary along the different dimensions.
# to do this we test the result against the basic derivative kernel
# where the gradients are calculated via tf.gradients.
x_ph = tf.placeholder(tf.float64, [None, 4])
x2_ph = tf.placeholder(tf.float64, [None, 4])
lengthscales = np.array([1.8, 0.9])
base_rbf_kern1 = gpflow.kernels.RBF(2, self.variance, lengthscales=lengthscales,
ARD=True)
base_rbf_kern2 = gpflow.kernels.RBF(2, self.variance, lengthscales=lengthscales,
ARD=True)
diff_dynamic_kernel = gpflow.derivative_kernel.DifferentialObservationsKernelDynamic(
2, base_rbf_kern1, 2
)
diff_kernel = gpflow.derivative_kernel.RBFDerivativeKern(2, 2, base_kernel=base_rbf_kern2)
with self.test_session() as sess:
with diff_kernel.tf_mode():
x_free = tf.placeholder('float64')
diff_kernel.make_tf_array(x_free)
k = diff_kernel.K(x_ph, x2_ph)
with diff_dynamic_kernel.tf_mode():
x_free_2 = tf.placeholder('float64')
diff_dynamic_kernel.make_tf_array(x_free_2)def create_parabola_model(domain, design=None):
if design is None:
design = gpflowopt.design.LatinHyperCube(16, domain)
X, Y = design.generate(), parabola2d(design.generate())
m = gpflow.gpr.GPR(X, Y, gpflow.kernels.RBF(2, ARD=True))
return mdef test_mixed_mok_with_Id_vs_independent_mok(session_tf):
data = DataMixedKernelWithEye
# Independent model
k1 = mk.SharedIndependentMok(RBF(data.D, variance=0.5, lengthscales=1.2), data.L)
f1 = InducingPoints(data.X[:data.M, ...].copy())
m1 = SVGP(data.X, data.Y, k1, Gaussian(), f1,
q_mu=data.mu_data_full, q_sqrt=data.sqrt_data_full)
m1.set_trainable(False)
m1.q_sqrt.set_trainable(True)
gpflow.training.ScipyOptimizer().minimize(m1, maxiter=data.MAXITER)
# Mixed Model
kern_list = [RBF(data.D, variance=0.5, lengthscales=1.2) for _ in range(data.L)]
k2 = mk.SeparateMixedMok(kern_list, data.W)
f2 = InducingPoints(data.X[:data.M, ...].copy())
m2 = SVGP(data.X, data.Y, k2, Gaussian(), f2,
q_mu=data.mu_data_full, q_sqrt=data.sqrt_data_full)
m2.set_trainable(False)
m2.q_sqrt.set_trainable(True)
gpflow.training.ScipyOptimizer().minimize(m2, maxiter=data.MAXITER)
check_equality_predictions(session_tf, [m1, m2])perm = list(range(30))
rng.shuffle(perm)
Xtest = rng.rand(10, 2) * 10
X_augumented = np.hstack([np.concatenate(X), np.concatenate(label)])
Y_augumented = np.hstack([np.concatenate(Y), np.concatenate(label)])
# 1. Two independent VGPs for two sets of data
k0 = gpflow.kernels.RBF(2)
k0.lengthscales.trainable = False
vgp0 = gpflow.models.VGP(
X[0], Y[0], kern=k0,
mean_function=gpflow.mean_functions.Constant(),
likelihood=gpflow.likelihoods.Gaussian())
k1 = gpflow.kernels.RBF(2)
k1.lengthscales.trainable = False
vgp1 = gpflow.models.VGP(
X[1], Y[1], kern=k1,
mean_function=gpflow.mean_functions.Constant(),
likelihood=gpflow.likelihoods.Gaussian())
# 2. Coregionalized GPR
lik = gpflow.likelihoods.SwitchedLikelihood(
[gpflow.likelihoods.Gaussian(), gpflow.likelihoods.Gaussian()])
kc = gpflow.kernels.RBF(2)
kc.trainable = False # lengthscale and variance is fixed.
coreg = gpflow.kernels.Coregion(1, output_dim=2, rank=1, active_dims=[2])
coreg.W.trainable = Falsedef prepare(self):
rng = np.random.RandomState(0)
X = rng.rand(20, 1) * 10
Y = np.sin(X) + 0.9 * np.cos(X * 1.6) + rng.randn(*X.shape) * 0.8
Y = np.tile(Y, 2) # two identical columns
self.Xtest = rng.rand(10, 1) * 10
m1 = gpflow.models.GPR(
X, Y, kern=gpflow.kernels.RBF(1),
mean_function=gpflow.mean_functions.Constant())
m2 = gpflow.models.VGP(
X, Y, gpflow.kernels.RBF(1), likelihood=gpflow.likelihoods.Gaussian(),
mean_function=gpflow.mean_functions.Constant())
m3 = gpflow.models.SVGP(
X, Y, gpflow.kernels.RBF(1),
likelihood=gpflow.likelihoods.Gaussian(),
Z=X.copy(),
q_diag=False,
mean_function=gpflow.mean_functions.Constant())
m3.feature.trainable = False
m4 = gpflow.models.SVGP(
X, Y, gpflow.kernels.RBF(1),
likelihood=gpflow.likelihoods.Gaussian(),
Z=X.copy(), q_diag=False, whiten=True,
mean_function=gpflow.mean_functions.Constant())
m4.feature.trainable = False
m5 = gpflow.models.SGPR(
X, Y, gpflow.kernels.RBF(1),
Z=X.copy(),
mean_function=gpflow.mean_functions.Constant())