Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_lazy_tensor(self):
a = torch.tensor([[4, 0, 2], [0, 3, -1], [2, -1, 3]], dtype=torch.float).repeat(3, 1, 1)
b = torch.tensor([[2, 1], [1, 2]], dtype=torch.float).repeat(3, 1, 1)
c = torch.tensor([[4, 0, 1, 0], [0, 4, -1, 0], [1, -1, 3, 0], [0, 0, 0, 4]], dtype=torch.float).repeat(3, 1, 1)
a.requires_grad_(True)
b.requires_grad_(True)
c.requires_grad_(True)
kp_lazy_tensor = KroneckerProductLazyTensor(NonLazyTensor(a), NonLazyTensor(b), NonLazyTensor(c))
return kp_lazy_tensor
def test_inv_quad_many_vectors(self):
# Forward pass
flattened_mats = self.mats_clone.view(-1, *self.mats_clone.shape[-2:])
actual_inv_quad = (
torch.cat([mat.inverse().unsqueeze(0) for mat in flattened_mats])
.view(self.mats_clone.shape)
.matmul(self.vecs_clone)
.mul(self.vecs_clone)
.sum(-2)
.sum(-1)
)
with gpytorch.settings.num_trace_samples(2000):
non_lazy_tsr = NonLazyTensor(self.mats)
res_inv_quad = non_lazy_tsr.inv_quad(self.vecs)
self.assertEqual(res_inv_quad.shape, actual_inv_quad.shape)
self.assertLess(torch.max((res_inv_quad - actual_inv_quad).abs()).item(), 1e-1)
# Backward
inv_quad_grad_output = torch.randn(2, 3, dtype=torch.float)
actual_inv_quad.backward(gradient=inv_quad_grad_output)
res_inv_quad.backward(gradient=inv_quad_grad_output, retain_graph=True)
self.assertLess(torch.max((self.mats_clone.grad - self.mats.grad).abs()).item(), 1e-1)
self.assertLess(torch.max((self.vecs_clone.grad - self.vecs.grad).abs()).item(), 1e-1)
def create_lazy_tensor(self):
root = torch.randn(3, 6, 7)
self.psd_mat = root.matmul(root.transpose(-2, -1))
slice1_mat = self.psd_mat[..., :2, :].requires_grad_()
slice2_mat = self.psd_mat[..., 2:4, :].requires_grad_()
slice3_mat = self.psd_mat[..., 4:6, :].requires_grad_()
slice1 = NonLazyTensor(slice1_mat)
slice2 = NonLazyTensor(slice2_mat)
slice3 = NonLazyTensor(slice3_mat)
return CatLazyTensor(slice1, slice2, slice3, dim=-2)
def test_multivariate_normal_correlated_samples(self, cuda=False):
device = torch.device("cuda") if cuda else torch.device("cpu")
for dtype in (torch.float, torch.double):
mean = torch.tensor([0, 1, 2], device=device, dtype=dtype)
covmat = torch.diag(torch.tensor([1, 0.75, 1.5], device=device, dtype=dtype))
mvn = MultivariateNormal(mean=mean, covariance_matrix=NonLazyTensor(covmat))
base_samples = mvn.get_base_samples(torch.Size([3, 4]))
self.assertTrue(mvn.sample(base_samples=base_samples).shape == torch.Size([3, 4, 3]))
base_samples = mvn.get_base_samples()
self.assertTrue(mvn.sample(base_samples=base_samples).shape == torch.Size([3]))
def create_lazy_tensor(self):
tensor = torch.randn(3, 5, 5)
tensor = tensor.transpose(-1, -2).matmul(tensor).detach()
diag = torch.tensor(
[[1.0, 2.0, 4.0, 2.0, 3.0], [2.0, 1.0, 2.0, 1.0, 4.0], [1.0, 2.0, 2.0, 3.0, 4.0]], requires_grad=True
)
return AddedDiagLazyTensor(NonLazyTensor(tensor), DiagLazyTensor(diag))
def create_lazy_tensor(self):
blocks = torch.randn(12, 4, 4)
blocks = blocks.transpose(-1, -2).matmul(blocks)
blocks.requires_grad_(True)
return SumBatchLazyTensor(NonLazyTensor(blocks))
def test_root_decomposition(self):
mat = self._create_mat().detach().requires_grad_(True)
mat_clone = mat.detach().clone().requires_grad_(True)
# Forward
root = NonLazyTensor(mat).root_decomposition().root.evaluate()
res = root.matmul(root.transpose(-1, -2))
self.assertAllClose(res, mat)
# Backward
sum([mat.trace() for mat in res.view(-1, mat.size(-2), mat.size(-1))]).backward()
sum([mat.trace() for mat in mat_clone.view(-1, mat.size(-2), mat.size(-1))]).backward()
self.assertAllClose(mat.grad, mat_clone.grad)
def test_matmul_multiple_vecs(self):
# Forward
res = NonLazyTensor(self.mat).matmul(self.vecs)
actual = self.mat_copy.matmul(self.vecs_copy)
self.assertTrue(approx_equal(res, actual))
# Backward
grad_output = torch.randn(3, 4)
res.backward(gradient=grad_output)
actual.backward(gradient=grad_output)
self.assertTrue(approx_equal(self.mat_copy.grad, self.mat.grad))
self.assertTrue(approx_equal(self.vecs_copy.grad, self.vecs.grad))
def create_lazy_tensor(self):
root = torch.randn(5, 3, 6, 7)
self.psd_mat = root.matmul(root.transpose(-2, -1))
slice1_mat = self.psd_mat[:2, ...].requires_grad_()
slice2_mat = self.psd_mat[2:3, ...].requires_grad_()
slice3_mat = self.psd_mat[3:, ...].requires_grad_()
slice1 = NonLazyTensor(slice1_mat)
slice2 = NonLazyTensor(slice2_mat)
slice3 = NonLazyTensor(slice3_mat)
return CatLazyTensor(slice1, slice2, slice3, dim=0)
if torch.any(zeroish):
# can't use in-place operation here b/c it would mess up backward pass
# haven't found a more elegant way to add a jitter diagonal yet...
jitter_diag = 1e-6 * torch.sign(Rdiag) * zeroish.to(Rdiag)
R = R + jitter_diag.unsqueeze(-1) * torch.eye(R.size(-1), device=R.device, dtype=R.dtype)
new_covar_cache = torch.triangular_solve(Q.transpose(-2, -1), R)[0].transpose(-2, -1)
# Expand inputs accordingly if necessary (for fantasies at the same points)
if full_inputs[0].dim() <= full_targets.dim():
fant_batch_shape = full_targets.shape[:1]
n_batch = len(full_mean.shape[:-1])
repeat_shape = fant_batch_shape + torch.Size([1] * n_batch)
full_inputs = [fi.expand(fant_batch_shape + fi.shape) for fi in full_inputs]
full_mean = full_mean.expand(fant_batch_shape + full_mean.shape)
full_covar = BatchRepeatLazyTensor(full_covar, repeat_shape)
new_root = BatchRepeatLazyTensor(NonLazyTensor(new_root), repeat_shape)
# no need to repeat the covar cache, broadcasting will do the right thing
# Create new DefaultPredictionStrategy object
fant_strat = self.__class__(
train_inputs=full_inputs,
train_prior_dist=self.train_prior_dist.__class__(full_mean, full_covar),
train_labels=full_targets,
likelihood=fant_likelihood,
root=new_root,
inv_root=new_covar_cache,
)
fant_strat._memoize_cache = {"mean_cache": fant_mean_cache, "covar_cache": new_covar_cache}
return fant_strat