Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
l3 = m2.compute_log_likelihood()
l4 = m3.compute_log_likelihood()
self.assertTrue(l1 == l2 == l3 == l4)
# make sure predictions still match (this tests AutoFlow)
pX = np.linspace(-3, 3, 10)[:, None]
p1, _ = self.m.predict_y(pX)
p2, _ = m1.predict_y(pX)
p3, _ = m2.predict_y(pX)
p4, _ = m3.predict_y(pX)
self.assertTrue(np.all(p1 == p2))
self.assertTrue(np.all(p1 == p3))
self.assertTrue(np.all(p1 == p4))
class TestPickleFix(GPflowTestCase):
"""
Make sure a kernel with a fixed parameter can be computed after pickling
"""
def test(self):
with self.test_context():
k = gpflow.kernels.PeriodicKernel(1)
k.period.fixed = True
k = pickle.loads(pickle.dumps(k))
x = np.linspace(0,1,100).reshape([-1,1])
k.compute_K(x, x)
class TestPickleSVGP(GPflowTestCase):
"""
Like the TestPickleGPR test, but with svgp (since it has extra tf variables
for minibatching)
"""
import numpy as np
import pytest
import tensorflow as tf
import gpflow.kernels as kernels
rng = np.random.RandomState(0)
class Datum:
num_data = 100
D = 100
X = rng.rand(num_data, D) * 100
kernel_list = [kernels.Matern12(), kernels.Matern32(), kernels.Matern52(),
kernels.Exponential(), kernels.Cosine()]
@pytest.mark.parametrize('kernel', kernel_list)
def test_kernel_euclidean_distance(kernel):
'''
Tests output & gradients of kernels that are a function of the (scaled) euclidean distance
of the points. We test on a high dimensional space, which can generate very small distances
causing the scaled_square_dist to generate some negative values.
'''
K = kernel(Datum.X)
assert not np.isnan(K).any(), 'NaNs in the output of the ' + kernel.__name__ + 'kernel.'
assert np.isfinite(K).all(), 'Infs in the output of the ' + kernel.__name__ + ' kernel.'
X_as_param = tf.Variable(Datum.X)
with tf.GradientTape() as tape:
def test_falls_back_to_default(self):
kern = gpflow.kernels.Matern52(10)
deriv_kern = gpflow.derivative_kernel.derivative_kernel_factory(10, 10, kern)
assert type(deriv_kern) == gpflow.derivative_kernel.DifferentialObservationsKernelDynamic
def _init_kernel(self, D, lengthscale, variance, period):
base = gpflow.kernels.Matern32(D, variance=variance, lengthscales=lengthscale)
return gpflow.kernels.Periodic(base=base, period=period)
def test_nongpr_model(self, domain):
design = gpflowopt.design.LatinHyperCube(16, domain)
X, Y = design.generate(), parabola2d(design.generate())
m = gpflow.models.VGP(X, Y, gpflow.kernels.RBF(2, ARD=True), likelihood=gpflow.likelihoods.Gaussian())
acq = gpflowopt.acquisition.ExpectedImprovement(m)
optimizer = gpflowopt.BayesianOptimizer(domain, acq, optimizer=gpflowopt.optim.SciPyOptimizer(domain))
result = optimizer.optimize(lambda X: parabola2d(X), n_iter=1)
assert result.success
def test_deriv_rbf_kernel_x1_and_x2_different_lengthscales(self):
# this test is mainly about testing our rbf derivative kernel implementation
# when the lengthscales vary along the different dimensions.
# to do this we test the result against the basic derivative kernel
# where the gradients are calculated via tf.gradients.
x_ph = tf.placeholder(tf.float64, [None, 4])
x2_ph = tf.placeholder(tf.float64, [None, 4])
lengthscales = np.array([1.8, 0.9])
base_rbf_kern1 = gpflow.kernels.RBF(2, self.variance, lengthscales=lengthscales,
ARD=True)
base_rbf_kern2 = gpflow.kernels.RBF(2, self.variance, lengthscales=lengthscales,
ARD=True)
diff_dynamic_kernel = gpflow.derivative_kernel.DifferentialObservationsKernelDynamic(
2, base_rbf_kern1, 2
)
diff_kernel = gpflow.derivative_kernel.RBFDerivativeKern(2, 2, base_kernel=base_rbf_kern2)
with self.test_session() as sess:
with diff_kernel.tf_mode():
x_free = tf.placeholder('float64')
diff_kernel.make_tf_array(x_free)
k = diff_kernel.K(x_ph, x2_ph)
with diff_dynamic_kernel.tf_mode():
x_free_2 = tf.placeholder('float64')
diff_dynamic_kernel.make_tf_array(x_free_2)
def create_parabola_model(domain, design=None):
if design is None:
design = gpflowopt.design.LatinHyperCube(16, domain)
X, Y = design.generate(), parabola2d(design.generate())
m = gpflow.gpr.GPR(X, Y, gpflow.kernels.RBF(2, ARD=True))
return m
def test_mixed_mok_with_Id_vs_independent_mok(session_tf):
data = DataMixedKernelWithEye
# Independent model
k1 = mk.SharedIndependentMok(RBF(data.D, variance=0.5, lengthscales=1.2), data.L)
f1 = InducingPoints(data.X[:data.M, ...].copy())
m1 = SVGP(data.X, data.Y, k1, Gaussian(), f1,
q_mu=data.mu_data_full, q_sqrt=data.sqrt_data_full)
m1.set_trainable(False)
m1.q_sqrt.set_trainable(True)
gpflow.training.ScipyOptimizer().minimize(m1, maxiter=data.MAXITER)
# Mixed Model
kern_list = [RBF(data.D, variance=0.5, lengthscales=1.2) for _ in range(data.L)]
k2 = mk.SeparateMixedMok(kern_list, data.W)
f2 = InducingPoints(data.X[:data.M, ...].copy())
m2 = SVGP(data.X, data.Y, k2, Gaussian(), f2,
q_mu=data.mu_data_full, q_sqrt=data.sqrt_data_full)
m2.set_trainable(False)
m2.q_sqrt.set_trainable(True)
gpflow.training.ScipyOptimizer().minimize(m2, maxiter=data.MAXITER)
check_equality_predictions(session_tf, [m1, m2])
perm = list(range(30))
rng.shuffle(perm)
Xtest = rng.rand(10, 2) * 10
X_augumented = np.hstack([np.concatenate(X), np.concatenate(label)])
Y_augumented = np.hstack([np.concatenate(Y), np.concatenate(label)])
# 1. Two independent VGPs for two sets of data
k0 = gpflow.kernels.RBF(2)
k0.lengthscales.trainable = False
vgp0 = gpflow.models.VGP(
X[0], Y[0], kern=k0,
mean_function=gpflow.mean_functions.Constant(),
likelihood=gpflow.likelihoods.Gaussian())
k1 = gpflow.kernels.RBF(2)
k1.lengthscales.trainable = False
vgp1 = gpflow.models.VGP(
X[1], Y[1], kern=k1,
mean_function=gpflow.mean_functions.Constant(),
likelihood=gpflow.likelihoods.Gaussian())
# 2. Coregionalized GPR
lik = gpflow.likelihoods.SwitchedLikelihood(
[gpflow.likelihoods.Gaussian(), gpflow.likelihoods.Gaussian()])
kc = gpflow.kernels.RBF(2)
kc.trainable = False # lengthscale and variance is fixed.
coreg = gpflow.kernels.Coregion(1, output_dim=2, rank=1, active_dims=[2])
coreg.W.trainable = False
def prepare(self):
rng = np.random.RandomState(0)
X = rng.rand(20, 1) * 10
Y = np.sin(X) + 0.9 * np.cos(X * 1.6) + rng.randn(*X.shape) * 0.8
Y = np.tile(Y, 2) # two identical columns
self.Xtest = rng.rand(10, 1) * 10
m1 = gpflow.models.GPR(
X, Y, kern=gpflow.kernels.RBF(1),
mean_function=gpflow.mean_functions.Constant())
m2 = gpflow.models.VGP(
X, Y, gpflow.kernels.RBF(1), likelihood=gpflow.likelihoods.Gaussian(),
mean_function=gpflow.mean_functions.Constant())
m3 = gpflow.models.SVGP(
X, Y, gpflow.kernels.RBF(1),
likelihood=gpflow.likelihoods.Gaussian(),
Z=X.copy(),
q_diag=False,
mean_function=gpflow.mean_functions.Constant())
m3.feature.trainable = False
m4 = gpflow.models.SVGP(
X, Y, gpflow.kernels.RBF(1),
likelihood=gpflow.likelihoods.Gaussian(),
Z=X.copy(), q_diag=False, whiten=True,
mean_function=gpflow.mean_functions.Constant())
m4.feature.trainable = False
m5 = gpflow.models.SGPR(
X, Y, gpflow.kernels.RBF(1),
Z=X.copy(),
mean_function=gpflow.mean_functions.Constant())