Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def kernel():
return gpflow.kernels.SquaredExponential()
def prepare(self):
rng = np.random.RandomState(0)
X = rng.rand(20, 1) * 10
Y = np.sin(X) + 0.9 * np.cos(X * 1.6) + rng.randn(*X.shape) * 0.8
Y = np.tile(Y, 2) # two identical columns
self.Xtest = rng.rand(10, 1) * 10
m1 = gpflow.models.GPR(
X, Y, kernel=gpflow.kernels.SquaredExponential(1),
mean_function=gpflow.mean_functions.Constant())
m2 = gpflow.models.VGP(
X, Y, gpflow.kernels.SquaredExponential(1), likelihood=gpflow.likelihoods.Gaussian(),
mean_function=gpflow.mean_functions.Constant())
m3 = gpflow.models.SVGP(
X, Y, gpflow.kernels.SquaredExponential(1),
likelihood=gpflow.likelihoods.Gaussian(),
Z=X.copy(),
q_diag=False,
mean_function=gpflow.mean_functions.Constant())
m3.inducing_variables.trainable = False
m4 = gpflow.models.SVGP(
X, Y, gpflow.kernels.SquaredExponential(1),
likelihood=gpflow.likelihoods.Gaussian(),
Z=X.copy(), q_diag=False, whiten=True,
mean_function=gpflow.mean_functions.Constant())
m4.inducing_variables.trainable = False
m5 = gpflow.models.SGPR(
X, Y, gpflow.kernels.SquaredExponential(1),
def test_sgpr_qu():
rng = Datum().rng
X, Z = tf.cast(rng.randn(100, 2), default_float()), tf.cast(rng.randn(20, 2), default_float())
Y = tf.cast(np.sin(X @ np.array([[-1.4], [0.5]])) + 0.5 * np.random.randn(len(X), 1), default_float())
model = gpflow.models.SGPR((X, Y), kernel=gpflow.kernels.SquaredExponential(), inducing_variable=Z)
@tf.function
def closure():
return - model.log_marginal_likelihood()
gpflow.optimizers.Scipy().minimize(closure, variables=model.trainable_variables)
qu_mean, qu_cov = model.compute_qu()
f_at_Z_mean, f_at_Z_cov = model.predict_f(model.inducing_variable.Z, full_cov=True)
np.testing.assert_allclose(qu_mean, f_at_Z_mean, rtol=1e-5, atol=1e-5)
np.testing.assert_allclose(tf.reshape(qu_cov, (1, 20, 20)), f_at_Z_cov, rtol=1e-5, atol=1e-5)
2,
InducingPoints(np.random.randn(71, 2)),
gpflow.kernels.Matern12(variance=1.84,
lengthscale=np.random.uniform(0.5, 3., 2))
],
[
2,
Multiscale(np.random.randn(71, 2),
np.random.uniform(0.5, 3, size=(71, 2))),
gpflow.kernels.SquaredExponential(variance=1.84,
lengthscale=np.random.uniform(0.5, 3., 2))
],
[
9,
InducingPatches(np.random.randn(71, 4)),
gpflow.kernels.Convolutional(gpflow.kernels.SquaredExponential(), [3, 3], [2, 2])
]
]
@pytest.mark.parametrize('input_dim, inducing_variable, kernel', _inducing_variables_and_kernels)
def test_inducing_variables_psd_schur(input_dim, inducing_variable, kernel):
# Conditional variance must be PSD.
X = np.random.randn(5, input_dim)
Kuf_values = Kuf(inducing_variable, kernel, X)
Kuu_values = Kuu(inducing_variable, kernel, jitter=default_jitter())
Kff_values = kernel(X)
Qff_values = Kuf_values.numpy().T @ np.linalg.solve(Kuu_values, Kuf_values)
assert np.all(np.linalg.eig(Kff_values - Qff_values)[0] > 0.0)
def test_multi_scale_inducing_equivalence_inducing_points(N, M, D):
# Multiscale must be equivalent to inducing points when variance is zero
Xnew, Z = np.random.randn(N, D), np.random.randn(M, D)
rbf = gpflow.kernels.SquaredExponential(1.3441, lengthscale=np.random.uniform(0.5, 3., D))
inducing_variable_zero_lengthscale = Multiscale(Z, scales=np.zeros(Z.shape))
inducing_variable_inducing_point = InducingPoints(Z)
multi_scale_Kuf = Kuf(inducing_variable_zero_lengthscale, rbf, Xnew)
inducing_point_Kuf = Kuf(inducing_variable_inducing_point, rbf, Xnew)
deviation_percent_Kuf = np.max(
np.abs(multi_scale_Kuf - inducing_point_Kuf) / inducing_point_Kuf *
100)
assert deviation_percent_Kuf < 0.1
multi_scale_Kuu = Kuu(inducing_variable_zero_lengthscale, rbf)
inducing_point_Kuu = Kuu(inducing_variable_inducing_point, rbf)
deviation_percent_Kuu = np.max(
np.abs(multi_scale_Kuu - inducing_point_Kuu) / inducing_point_Kuu *
def _prepare_models():
"""
Prepare models to make sure the coregionalized model with diagonal coregion kernel and
with fixed lengthscale is equivalent with normal GP regression.
"""
# 1. Two independent VGPs for two sets of data
k0 = gpflow.kernels.SquaredExponential()
k0.lengthscale.trainable = False
k1 = gpflow.kernels.SquaredExponential()
k1.lengthscale.trainable = False
vgp0 = gpflow.models.VGP((Datum.X[0], Datum.Y[0]),
kernel=k0,
mean_function=Constant(),
likelihood=gpflow.likelihoods.Gaussian(), num_latent=1)
vgp1 = gpflow.models.VGP((Datum.X[1], Datum.Y[1]),
kernel=k1,
mean_function=Constant(),
likelihood=gpflow.likelihoods.Gaussian(), num_latent=1)
# 2. Coregionalized GPR
kc = gpflow.kernels.SquaredExponential(active_dims=[0, 1])
kc.lengthscale.trainable = False
kc.variance.trainable = False # variance is handles by the coregion kernel
coreg = gpflow.kernels.Coregion(output_dim=2, rank=1, active_dims=[2])
def test_gpr_objective_equivalence():
"""
In Maximum Likelihood Estimation (MLE), i.e. when there are no priors on
the parameters, the objective should not depend on any transforms on the
parameters.
We use GPR as a simple model that has an objective.
"""
data = (Datum.X, Datum.Y)
l_value = Datum.lengthscale
l_variable = tf.Variable(l_value, dtype=gpflow.default_float(), trainable=True)
m1 = gpflow.models.GPR(data, kernel=gpflow.kernels.SquaredExponential(lengthscale=l_value))
m2 = gpflow.models.GPR(data, kernel=gpflow.kernels.SquaredExponential())
m2.kernel.lengthscale = gpflow.Parameter(l_variable, transform=None)
assert np.allclose(m1.kernel.lengthscale.numpy(), m2.kernel.lengthscale.numpy()) # consistency check
assert np.allclose(m1.log_marginal_likelihood().numpy(),
m2.log_marginal_likelihood().numpy()), \
"MLE objective should not depend on Parameter transform"
def setUp(self):
self.test_graph = tf.Graph()
with self.test_context():
inputdim = 9
rng = np.random.RandomState(1)
self.rng = rng
self.dim = inputdim
self.kernels = []
self.kernels.append(gpflow.kernels.Convolutional(gpflow.kernels.SquaredExponential(4), [3, 3], [2, 2]))
wconvk = gpflow.kernels.WeightedConvolutional(gpflow.kernels.SquaredExponential(4), [3, 3], [2, 2])
wconvk.weights.assign(np.random.randn(wconvk.num_patches))
self.kernels.append(wconvk)
def test_sample_conditional_mixedkernel():
q_mu = tf.random.uniform((Data.M, Data.L), dtype=tf.float64) # M x L
q_sqrt = tf.convert_to_tensor(
[np.tril(tf.random.uniform((Data.M, Data.M), dtype=tf.float64)) for _ in range(Data.L)]) # L x M x M
Z = Data.X[:Data.M, ...] # M x D
N = int(10e5)
Xs = np.ones((N, Data.D), dtype=float_type)
# Path 1: mixed kernel: most efficient route
W = np.random.randn(Data.P, Data.L)
mixed_kernel = mk.LinearCoregionalization([SquaredExponential() for _ in range(Data.L)], W)
optimal_inducing_variable = mf.SharedIndependentInducingVariables(InducingPoints(Z))
value, mean, var = sample_conditional(Xs, optimal_inducing_variable, mixed_kernel, q_mu, q_sqrt=q_sqrt, white=True)
# Path 2: independent kernels, mixed later
separate_kernel = mk.SeparateIndependent([SquaredExponential() for _ in range(Data.L)])
fallback_inducing_variable = mf.SharedIndependentInducingVariables(InducingPoints(Z))
value2, mean2, var2 = sample_conditional(Xs, fallback_inducing_variable, separate_kernel, q_mu, q_sqrt=q_sqrt,
white=True)
value2 = np.matmul(value2, W.T)
# check if mean and covariance of samples are similar
np.testing.assert_array_almost_equal(np.mean(value, axis=0), np.mean(value2, axis=0), decimal=1)
np.testing.assert_array_almost_equal(np.cov(value, rowvar=False), np.cov(value2, rowvar=False), decimal=1)
self.ytrain = (y.reshape(y.size, 1) - self.ymean)/self.yscale
print(self.xtrain.shape)
self.xtrain = self.xtrain.astype(np.float64)
self.ytrain = self.ytrain.astype(np.float64)
self.ndim = self.xtrain.shape[-1]
l = np.empty(self.ndim)
kern = list()
for k in range(self.ndim):
# TODO: guess this in a finer fashion via FFT in all directions
l[k] = 0.3*(np.max(self.xtrain[:, k]) - np.min(self.xtrain[:, k]))
kern.append(gpflow.kernels.SquaredExponential(
lengthscales=l[k], variance=1.0, active_dims=[k]))
if k == 0:
# Guess a bit more broadly
kern[k].variance.assign(3.0)
else:
# Need only one y scale
set_trainable(kern[k].variance, False)
kerns = gpflow.kernels.Product(kern)
self.m = gpflow.models.GPR((self.xtrain, self.ytrain), kernel=kerns)
self.m.likelihood.variance.assign(1e-2) # Guess little noise
# Optimize
def objective_closure():