Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def forward(self, x):
mean_x = self.mean_module(x)
covar_x = self.covar_module(x)
latent_pred = gpytorch.distributions.MultivariateNormal(mean_x, covar_x)
return latent_pred
def _get_test_posterior_batched(device, dtype=torch.float):
mean = torch.zeros(3, 2, device=device, dtype=dtype)
cov = torch.eye(2, device=device, dtype=dtype).repeat(3, 1, 1)
mvn = MultivariateNormal(mean, cov)
return GPyTorchPosterior(mvn)
def forward(self, x):
mean_x = self.mean_module(x)
covar_x = self.covar_module(x)
res = gpytorch.distributions.MultivariateNormal(mean_x, covar_x)
return res
def forward(self, x):
mean_x = self.mean_module(x)
covar_x = self.covar_module(x)
return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)
def forward(self, x):
mean_x = self.mean_module(x)
covar_x = self.covar_module(x)
latent_pred = MultivariateNormal(mean_x, covar_x)
return latent_pred
def test_multivariate_normal_non_lazy(self, cuda=False):
device = torch.device("cuda") if cuda else torch.device("cpu")
for dtype in (torch.float, torch.double):
mean = torch.tensor([0, 1, 2], device=device, dtype=dtype)
covmat = torch.diag(torch.tensor([1, 0.75, 1.5], device=device, dtype=dtype))
mvn = MultivariateNormal(mean=mean, covariance_matrix=covmat, validate_args=True)
self.assertTrue(torch.is_tensor(mvn.covariance_matrix))
self.assertIsInstance(mvn.lazy_covariance_matrix, LazyTensor)
self.assertAllClose(mvn.variance, torch.diag(covmat))
self.assertAllClose(mvn.scale_tril, covmat.sqrt())
mvn_plus1 = mvn + 1
self.assertAllClose(mvn_plus1.mean, mvn.mean + 1)
self.assertAllClose(mvn_plus1.covariance_matrix, mvn.covariance_matrix)
mvn_times2 = mvn * 2
self.assertAllClose(mvn_times2.mean, mvn.mean * 2)
self.assertAllClose(mvn_times2.covariance_matrix, mvn.covariance_matrix * 4)
mvn_divby2 = mvn / 2
self.assertAllClose(mvn_divby2.mean, mvn.mean / 2)
self.assertAllClose(mvn_divby2.covariance_matrix, mvn.covariance_matrix / 4)
self.assertAlmostEqual(mvn.entropy().item(), 4.3157, places=4)
self.assertAlmostEqual(mvn.log_prob(torch.zeros(3, device=device, dtype=dtype)).item(), -4.8157, places=4)
logprob = mvn.log_prob(torch.zeros(2, 3, device=device, dtype=dtype))
def prior_distribution(self):
"""
If desired, models can compare the input to forward to inducing_points and use a GridKernel for space
efficiency.
However, when using a default VariationalDistribution which has an O(m^2) space complexity anyways, we find that
GridKernel is typically not worth it due to the moderate slow down of using FFTs.
"""
out = super(AdditiveGridInterpolationVariationalStrategy, self).prior_distribution
mean = out.mean.repeat(self.num_dim, 1)
covar = out.lazy_covariance_matrix.repeat(self.num_dim, 1, 1)
return MultivariateNormal(mean, covar)
def forward(self, x):
mean_x = self.mean_module(x)
covar_x = self.covar_module(x)
return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)
raise RuntimeError(
"train_inputs, train_targets cannot be None in training mode. "
"Call .eval() for prior predictions, or call .set_train_data() to add training data."
)
if settings.debug.on():
if not all(torch.equal(train_input, input) for train_input, input in zip(train_inputs, inputs)):
raise RuntimeError("You must train on the training inputs!")
res = super().__call__(*inputs, **kwargs)
return res
# Prior mode
elif settings.prior_mode.on() or self.train_inputs is None or self.train_targets is None:
full_inputs = args
full_output = super(ExactGP, self).__call__(*full_inputs, **kwargs)
if settings.debug().on():
if not isinstance(full_output, MultivariateNormal):
raise RuntimeError("ExactGP.forward must return a MultivariateNormal")
return full_output
# Posterior mode
else:
if settings.debug.on():
if all(torch.equal(train_input, input) for train_input, input in zip(train_inputs, inputs)):
warnings.warn(
"The input matches the stored training data. Did you forget to call model.train()?", UserWarning
)
# Get the terms that only depend on training data
if self.prediction_strategy is None:
train_output = super().__call__(*train_inputs, **kwargs)
# Create the prediction strategy for
*params: Any,
batch_shape: Optional[torch.Size] = None,
shape: Optional[torch.Size] = None,
noise: Optional[Tensor] = None,
) -> DiagLazyTensor:
if noise is not None:
return DiagLazyTensor(noise)
training = self.noise_model.training # keep track of mode
self.noise_model.eval() # we want the posterior prediction of the noise model
with settings.detach_test_caches(False), settings.debug(False):
if len(params) == 1 and not torch.is_tensor(params[0]):
output = self.noise_model(*params[0])
else:
output = self.noise_model(*params)
self.noise_model.train(training)
if not isinstance(output, MultivariateNormal):
raise NotImplementedError("Currently only noise models that return a MultivariateNormal are supported")
# note: this also works with MultitaskMultivariateNormal, where this
# will return a batched DiagLazyTensors of size n x num_tasks x num_tasks
noise_diag = output.mean if self._noise_indices is None else output.mean[..., self._noise_indices]
return DiagLazyTensor(self._noise_constraint.transform(noise_diag))