Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_test_posterior(batch_shape, q=1, m=1, **tkwargs):
mean = torch.rand(*batch_shape, q, m, **tkwargs)
a = torch.rand(*batch_shape, q * m, q * m, **tkwargs)
covar = a @ a.transpose(-1, -2)
diag = torch.diagonal(covar, dim1=-2, dim2=-1)
diag += torch.rand(*batch_shape, q * m, **tkwargs) # in-place
mvn = MultitaskMultivariateNormal(mean, covar)
return GPyTorchPosterior(mvn)
logprob = mtmvn.log_prob(torch.zeros(2, 3, 2, device=device, dtype=dtype))
logprob_expected = -14.52826 * torch.ones(2, device=device, dtype=dtype)
self.assertTrue(torch.allclose(logprob, logprob_expected))
logprob = mtmvn.log_prob(torch.zeros(3, 2, 3, 2, device=device, dtype=dtype))
logprob_expected = -14.52826 * torch.ones(3, 2, device=device, dtype=dtype)
self.assertTrue(torch.allclose(logprob, logprob_expected))
conf_lower, conf_upper = mtmvn.confidence_region()
self.assertTrue(torch.allclose(conf_lower, mtmvn.mean - 2 * mtmvn.stddev))
self.assertTrue(torch.allclose(conf_upper, mtmvn.mean + 2 * mtmvn.stddev))
self.assertTrue(mtmvn.sample().shape == torch.Size([2, 3, 2]))
self.assertTrue(mtmvn.sample(torch.Size([3])).shape == torch.Size([3, 2, 3, 2]))
self.assertTrue(mtmvn.sample(torch.Size([3, 4])).shape == torch.Size([3, 4, 2, 3, 2]))
# non-interleaved
covmat = variance.transpose(-1, -2).reshape(2, 1, -1) * torch.eye(6, device=device, dtype=dtype)
mtmvn = MultitaskMultivariateNormal(mean=mean, covariance_matrix=covmat, interleaved=False)
self.assertTrue(torch.equal(mtmvn.mean, mean))
self.assertTrue(torch.allclose(mtmvn.variance, variance))
self.assertTrue(torch.allclose(mtmvn.scale_tril, covmat.sqrt()))
self.assertTrue(mtmvn.event_shape == torch.Size([3, 2]))
self.assertTrue(mtmvn.batch_shape == torch.Size([2]))
mvn = MultivariateNormal(mean, covar)
p = GPyTorchPosterior(mvn)
mm = MockModel(p)
weights = torch.tensor([0.5], device=self.device, dtype=dtype)
obj = ScalarizedObjective(weights)
ei = ExpectedImprovement(model=mm, best_f=0.0, objective=obj)
X = torch.rand(1, 2, device=self.device, dtype=dtype)
ei_expected = torch.tensor(0.2601, device=self.device, dtype=dtype)
torch.allclose(ei(X), ei_expected, atol=1e-4)
# test objective (multi-output)
mean = torch.tensor([[-0.25, 0.5]], device=self.device, dtype=dtype)
covar = torch.tensor(
[[[0.5, 0.125], [0.125, 0.5]]], device=self.device, dtype=dtype
)
mvn = MultitaskMultivariateNormal(mean, covar)
p = GPyTorchPosterior(mvn)
mm = MockModel(p)
weights = torch.tensor([2.0, 1.0], device=self.device, dtype=dtype)
obj = ScalarizedObjective(weights)
ei = ExpectedImprovement(model=mm, best_f=0.0, objective=obj)
X = torch.rand(1, 2, device=self.device, dtype=dtype)
ei_expected = torch.tensor(0.6910, device=self.device, dtype=dtype)
torch.allclose(ei(X), ei_expected, atol=1e-4)
self.assertAlmostEqual(
mtmvn.log_prob(torch.zeros(3, 2, device=device, dtype=dtype)).item(), -14.52826, places=4
)
logprob = mtmvn.log_prob(torch.zeros(2, 3, 2, device=device, dtype=dtype))
logprob_expected = -14.52826 * torch.ones(2, device=device, dtype=dtype)
self.assertTrue(torch.allclose(logprob, logprob_expected))
conf_lower, conf_upper = mtmvn.confidence_region()
self.assertTrue(torch.allclose(conf_lower, mtmvn.mean - 2 * mtmvn.stddev))
self.assertTrue(torch.allclose(conf_upper, mtmvn.mean + 2 * mtmvn.stddev))
self.assertTrue(mtmvn.sample().shape == torch.Size([3, 2]))
self.assertTrue(mtmvn.sample(torch.Size([3])).shape == torch.Size([3, 3, 2]))
self.assertTrue(mtmvn.sample(torch.Size([3, 4])).shape == torch.Size([3, 4, 3, 2]))
# non-interleaved
covmat = variance.transpose(-1, -2).reshape(-1).diag()
mtmvn = MultitaskMultivariateNormal(mean=mean, covariance_matrix=covmat, interleaved=False)
self.assertTrue(torch.equal(mtmvn.mean, mean))
self.assertTrue(torch.allclose(mtmvn.variance, variance))
self.assertTrue(torch.allclose(mtmvn.scale_tril, covmat.sqrt()))
self.assertTrue(mtmvn.event_shape == torch.Size([3, 2]))
self.assertTrue(mtmvn.batch_shape == torch.Size())
mtmvn = MultitaskMultivariateNormal(mean=mean, covariance_matrix=cov)
posterior = GPyTorchPosterior(mvn=mtmvn)
for sample_shape, qmc, seed in itertools.product(
(torch.Size([5]), torch.Size([5, 3])), (False, True), (None, 1234)
):
expected_shape = sample_shape + torch.Size([2, 2])
samples = construct_base_samples_from_posterior(
posterior=posterior, sample_shape=sample_shape, qmc=qmc, seed=seed
)
self.assertEqual(samples.shape, expected_shape)
self.assertEqual(samples.device.type, self.device.type)
self.assertEqual(samples.dtype, dtype)
# multi-output, batch mode
mean = torch.zeros(2, 2, 2, device=self.device, dtype=dtype)
cov = torch.eye(4, device=self.device, dtype=dtype).expand(2, 4, 4)
mtmvn = MultitaskMultivariateNormal(mean=mean, covariance_matrix=cov)
posterior = GPyTorchPosterior(mvn=mtmvn)
for sample_shape, qmc, seed, collapse_batch_dims in itertools.product(
(torch.Size([5]), torch.Size([5, 3])),
(False, True),
(None, 1234),
(False, True),
):
if collapse_batch_dims:
expected_shape = sample_shape + torch.Size([1, 2, 2])
else:
expected_shape = sample_shape + torch.Size([2, 2, 2])
samples = construct_base_samples_from_posterior(
posterior=posterior,
sample_shape=sample_shape,
qmc=qmc,
collapse_batch_dims=collapse_batch_dims,
def __call__(self, x, prior=False):
function_dist = self.base_variational_strategy(x, prior=prior)
if (
self.task_dim > 0
and self.task_dim > len(function_dist.batch_shape)
or self.task_dim < 0
and self.task_dim + len(function_dist.batch_shape) < 0
):
return MultitaskMultivariateNormal.from_repeated_mvn(function_dist, num_tasks=self.num_tasks)
else:
function_dist = MultitaskMultivariateNormal.from_batch_mvn(function_dist, task_dim=self.task_dim)
assert function_dist.event_shape[-1] == self.num_tasks
return function_dist
def forward(self, x):
mean_x = self.mean_module(x)
covar_x = self.covar_module(x)
return gpytorch.distributions.MultitaskMultivariateNormal(mean_x, covar_x)
raise RuntimeError(
f"Input shape did not match self.input_dims. Got total feature dims [{inputs.size(-1)}],"
f" expected [{self.input_dims}]"
)
# Repeat the input for all possible outputs
if self.output_dims is not None:
inputs = inputs.unsqueeze(-3)
inputs = inputs.expand(*inputs.shape[:-3], self.output_dims, *inputs.shape[-2:])
# Now run samples through the GP
output = ApproximateGP.__call__(self, inputs)
if self.output_dims is not None:
mean = output.loc.transpose(-1, -2)
covar = BlockDiagLazyTensor(output.lazy_covariance_matrix, block_dim=-3)
output = MultitaskMultivariateNormal(mean, covar, interleaved=False)
return output