Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_grid_gp_mean_abs_error(self, num_dim=1, cuda=False):
device = torch.device("cuda") if cuda else torch.device("cpu")
grid_bounds = [(0, 1)] if num_dim == 1 else [(0, 1), (0, 2)]
grid_size = 25
grid = torch.zeros(grid_size, len(grid_bounds), device=device)
for i in range(len(grid_bounds)):
grid_diff = float(grid_bounds[i][1] - grid_bounds[i][0]) / (grid_size - 2)
grid[:, i] = torch.linspace(
grid_bounds[i][0] - grid_diff, grid_bounds[i][1] + grid_diff, grid_size, device=device
)
train_x, train_y, test_x, test_y = make_data(grid, cuda=cuda)
likelihood = gpytorch.likelihoods.GaussianLikelihood()
gp_model = GridGPRegressionModel(grid, train_x, train_y, likelihood)
mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, gp_model)
if cuda:
gp_model.cuda()
likelihood.cuda()
# Optimize the model
gp_model.train()
likelihood.train()
optimizer = optim.Adam(list(gp_model.parameters()) + list(likelihood.parameters()), lr=0.1)
optimizer.n_iter = 0
with gpytorch.settings.debug(True):
for _ in range(20):
optimizer.zero_grad()
output = gp_model(train_x)
loss = -mll(output, train_y)
def test_multitask_gp_mean_abs_error(self):
likelihood = MultitaskGaussianLikelihood(num_tasks=2)
model = MultitaskGPModel(train_x, train_y, likelihood)
# Find optimal model hyperparameters
model.train()
likelihood.train()
# Use the adam optimizer
optimizer = torch.optim.Adam([{"params": model.parameters()}], lr=0.1) # Includes GaussianLikelihood parameters
# "Loss" for GPs - the marginal log likelihood
mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, model)
n_iter = 50
for _ in range(n_iter):
# Zero prev backpropped gradients
optimizer.zero_grad()
# Make predictions from training data
# Again, note feeding duplicated x_data and indices indicating which task
output = model(train_x)
# TODO: Fix this view call!!
loss = -mll(output, train_y)
loss.backward()
optimizer.step()
# Test the model
model.eval()
likelihood.eval()
def test_kissgp_gp_mean_abs_error_cuda(self):
if not torch.cuda.is_available():
return
with least_used_cuda_device():
train_x, train_y, test_x, test_y = make_data(cuda=True)
likelihood = GaussianLikelihood().cuda()
gp_model = GPRegressionModel(train_x, train_y, likelihood).cuda()
mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, gp_model)
# Optimize the model
gp_model.train()
likelihood.train()
optimizer = optim.Adam(list(gp_model.parameters()) + list(likelihood.parameters()), lr=0.1)
optimizer.n_iter = 0
with gpytorch.settings.debug(False):
for _ in range(25):
optimizer.zero_grad()
output = gp_model(train_x)
loss = -mll(output, train_y)
loss.backward()
optimizer.n_iter += 1
optimizer.step()
def test_multitask_gp_mean_abs_error(self):
likelihood = GaussianLikelihood(log_noise_prior=SmoothedBoxPrior(-6, 6))
gp_model = HadamardMultitaskGPModel(
(torch.cat([train_x, train_x]), torch.cat([y1_inds, y2_inds])), torch.cat([train_y1, train_y2]), likelihood
)
mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, gp_model)
# Optimize the model
gp_model.train()
likelihood.eval()
optimizer = optim.Adam(gp_model.parameters(), lr=0.01)
for _ in range(100):
optimizer.zero_grad()
output = gp_model(torch.cat([train_x, train_x]), torch.cat([y1_inds, y2_inds]))
loss = -mll(output, torch.cat([train_y1, train_y2]))
loss.backward()
optimizer.step()
for param in gp_model.parameters():
self.assertTrue(param.grad is not None)
self.assertGreater(param.grad.norm().item(), 0)
for param in likelihood.parameters():
def test_sgpr_mean_abs_error_cuda(self):
if not torch.cuda.is_available():
return
with least_used_cuda_device():
train_x, train_y, test_x, test_y = make_data(cuda=True)
likelihood = GaussianLikelihood().cuda()
gp_model = GPRegressionModel(train_x, train_y, likelihood).cuda()
mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, gp_model)
# Optimize the model
gp_model.train()
likelihood.train()
optimizer = optim.Adam(list(gp_model.parameters()) + list(likelihood.parameters()), lr=0.1)
optimizer.n_iter = 0
for _ in range(25):
optimizer.zero_grad()
output = gp_model(train_x)
loss = -mll(output, train_y)
loss.backward()
optimizer.n_iter += 1
optimizer.step()
for param in gp_model.parameters():
def get_fitted_model(train_X: Tensor, train_Y: Tensor) -> SingleTaskGP:
"""
Fit SingleTaskGP with torch.optim.Adam.
"""
model = SingleTaskGP(train_X, train_Y)
mll = ExactMarginalLogLikelihood(model.likelihood, model).to(train_X)
fit_gpytorch_model(mll, optimizer=fit_gpytorch_torch, options={"disp": False})
return model
covar_x = self.covar_module(x)
return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)
# initialize likelihood and model
likelihood = gpytorch.likelihoods.GaussianLikelihood()
model = ExactGPModel(train_x, train_y, likelihood)
# Find optimal model hyperparameters
model.train()
likelihood.train()
# Use full-batch L-BFGS optimizer
optimizer = FullBatchLBFGS(model.parameters())
# "Loss" for GPs - the marginal log likelihood
mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, model)
# define closure
def closure():
optimizer.zero_grad()
output = model(train_x)
loss = -mll(output, train_y)
return loss
loss = closure()
loss.backward()
training_iter = 10
for i in range(training_iter):
# perform step and update curvature
options = {'closure': closure, 'current_loss': loss, 'max_ls': 10}