Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_model():
kernel = create_kernel()
model = gpflow.models.SVGP(kernel=kernel,
likelihood=gpflow.likelihoods.Gaussian(variance_lower_bound=None),
inducing_variable=Data.Z,
q_diag=True)
model.q_mu.trainable = False
return model
def test_multioutput_with_diag_q_sqrt():
data = DataMixedKernel
q_sqrt_diag = np.ones((data.M, data.L)) * 2
q_sqrt = np.repeat(np.eye(data.M)[None, ...], data.L, axis=0) * 2 # L x M x M
kern_list = [SquaredExponential() for _ in range(data.L)]
k1 = mk.LinearCoregionalization(kern_list, W=data.W)
f1 = mf.SharedIndependentInducingVariables(InducingPoints(data.X[:data.M, ...]))
model_1 = SVGP(k1, Gaussian(), inducing_variable=f1, q_mu=data.mu_data, q_sqrt=q_sqrt_diag, q_diag=True)
kern_list = [SquaredExponential() for _ in range(data.L)]
k2 = mk.LinearCoregionalization(kern_list, W=data.W)
f2 = mf.SharedIndependentInducingVariables(InducingPoints(data.X[:data.M, ...]))
model_2 = SVGP(k2, Gaussian(), inducing_variable=f2, q_mu=data.mu_data, q_sqrt=q_sqrt, q_diag=False)
check_equality_predictions(Data.data, [model_1, model_2])
def prepare(self):
return gpflow.models.SVGP(
self.X, self.Y, Z=self.Z, kern=self.kernel(),
likelihood=gpflow.likelihoods.Gaussian(),
whiten=True, q_diag=True)
def model():
return gpflow.models.SVGP(
kernel=gpflow.kernels.SquaredExponential(lengthscale=Data.ls, variance=Data.var),
likelihood=gpflow.likelihoods.Gaussian(),
inducing_variable=Data.Z,
q_diag=True
)
self.assertEqual(p0, 0.0)
self.assertNotEqual(p0, p1)
self.assertEqual(l0, l1)
class TestName(GPflowTestCase):
def test_name(self):
with self.test_context():
m1 = Empty()
self.assertEqual(m1.name, 'Empty')
m2 = Empty(name='foo')
self.assertEqual(m2.name, 'foo')
class EvalDataSVGP(gpflow.models.SVGP):
@gpflow.decors.autoflow()
@gpflow.decors.params_as_tensors
def XY(self):
return self.X, self.Y
class TestMinibatchSVGP(GPflowTestCase):
def test_minibatch_sync(self):
with self.test_context():
X = np.random.randn(1000, 1)
Y = X.copy()
Z = X[:100, :].copy()
size = 10
m = EvalDataSVGP(X, Y, gpflow.kernels.RBF(1),
gpflow.likelihoods.Gaussian(),
minibatch_size=size, Z=Z)
def test_svgp(whiten, q_diag):
model = gpflow.models.SVGP(
gpflow.kernels.SquaredExponential(),
gpflow.likelihoods.Gaussian(),
inducing_variable=Datum.X.copy(),
q_diag=q_diag,
whiten=whiten,
mean_function=gpflow.mean_functions.Constant(),
num_latent=Datum.Y.shape[1],
)
gpflow.utilities.set_trainable(model.inducing_variable, False)
# test with explicitly unknown shapes:
tensor_spec = tf.TensorSpec(shape=None, dtype=default_float())
elbo = tf.function(
model.elbo,
autograph=False,
input_signature=[(tensor_spec, tensor_spec)],
def test_notwhite(self):
with self.test_context() as session:
m1 = gpflow.models.SVGP(
self.X,
self.Y,
kern=gpflow.kernels.RBF(1) + gpflow.kernels.White(1),
likelihood=gpflow.likelihoods.Exponential(),
Z=self.Z,
q_diag=True,
whiten=False)
m2 = gpflow.models.SVGP(
self.X,
self.Y,
kern=gpflow.kernels.RBF(1) + gpflow.kernels.White(1),
likelihood=gpflow.likelihoods.Exponential(),
Z=self.Z,
q_diag=False,
whiten=False)
qsqrt, qmean = self.rng.randn(2, 3, 2)
qsqrt = (qsqrt**2)*0.01
m1.q_sqrt = qsqrt
m1.q_mu = qmean
m2.q_sqrt = np.array([np.diag(qsqrt[:, 0]), np.diag(qsqrt[:, 1])])
m2.q_mu = qmean
obj1 = session.run(m1.objective, feed_dict=m1.feeds)
obj2 = session.run(m2.objective, feed_dict=m2.feeds)
assert_allclose(obj1, obj2)
def get_model(self, X, Y, Z, minibatch_size):
model = gpflow.models.SVGP(
X, Y, kern=gpflow.kernels.RBF(1),
likelihood=gpflow.likelihoods.Gaussian(),
Z=Z, minibatch_size=minibatch_size)
return model
feature = Xtrain[idx, ...]
# 1. `input_dim` is not required anymore.
kernel = gpflow.kernels.RBF()
# 2. Assigned value (10.0) here is constrained.
kernel.lengthscale <<= 10.0
kernel.variance.trainable = False
likelihood = gpflow.likelihoods.Bernoulli()
# 3. Constrained vs unconstrained values.
print(f"Unconstrained parameter value of `kernel.lengthscale` = {kernel.lengthscale}")
print(f"Constrained parameter value of `kernel.lengthscale` = {kernel.lengthscale}")
# 4. X's and Y's are no longer part of the model.
m = gpflow.models.SVGP(kernel=kernel, feature=feature, likelihood=likelihood)
X, Y = tf.convert_to_tensor(Xtrain), tf.convert_to_tensor(Ytrain)
def loss_cb():
return m.neg_log_marginal_likelihood(X, Y)
# 5. There is no more gpflow optimizers.
adam = tf.train.AdamOptimizer(0.0001)
# 6. Keras-like model fitting
gpflow.optimize(loss_cb, adam, m.trainable_variables, 10)