Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_model(n, fixed_noise=False, **tkwargs):
train_x1, train_x2, train_y1, train_y2 = _get_random_data(n=n, **tkwargs)
if fixed_noise:
train_y1_var = 0.1 + 0.1 * torch.rand_like(train_y1, **tkwargs)
train_y2_var = 0.1 + 0.1 * torch.rand_like(train_y2, **tkwargs)
model1 = FixedNoiseGP(
train_X=train_x1, train_Y=train_y1, train_Yvar=train_y1_var
)
model2 = FixedNoiseGP(
train_X=train_x2, train_Y=train_y2, train_Yvar=train_y2_var
)
else:
model1 = SingleTaskGP(train_X=train_x1, train_Y=train_y1)
model2 = SingleTaskGP(train_X=train_x2, train_Y=train_y2)
model = ModelListGP(model1, model2)
return model.to(**tkwargs)
def test_get_extra_mll_args(self):
train_X = torch.rand(3, 5)
train_Y = torch.rand(3, 1)
model = SingleTaskGP(train_X=train_X, train_Y=train_Y)
# test ExactMarginalLogLikelihood
exact_mll = ExactMarginalLogLikelihood(model.likelihood, model)
exact_extra_args = _get_extra_mll_args(mll=exact_mll)
self.assertEqual(len(exact_extra_args), 1)
self.assertTrue(torch.equal(exact_extra_args[0], train_X))
# test SumMarginalLogLikelihood
model2 = ModelListGP(model)
sum_mll = SumMarginalLogLikelihood(model2.likelihood, model2)
sum_mll_extra_args = _get_extra_mll_args(mll=sum_mll)
self.assertEqual(len(sum_mll_extra_args), 1)
self.assertEqual(len(sum_mll_extra_args[0]), 1)
self.assertTrue(torch.equal(sum_mll_extra_args[0][0], train_X))
# test unsupported MarginalLogLikelihood type
unsupported_mll = MarginalLogLikelihood(model.likelihood, model)
unsupported_mll_extra_args = _get_extra_mll_args(mll=unsupported_mll)
self.assertEqual(unsupported_mll_extra_args, [])
def test_ModelListGP_single(self):
tkwargs = {"device": self.device, "dtype": torch.float}
train_x1, train_x2, train_y1, train_y2 = _get_random_data(n=10, **tkwargs)
model1 = SingleTaskGP(train_X=train_x1, train_Y=train_y1)
model = ModelListGP(model1)
model.to(**tkwargs)
test_x = torch.tensor([[0.25], [0.75]], **tkwargs)
posterior = model.posterior(test_x)
self.assertIsInstance(posterior, GPyTorchPosterior)
self.assertIsInstance(posterior.mvn, MultivariateNormal)
gp2.likelihood.noise_covar.noise_prior.rate.fill_(1.0)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# check tensor shape agreement
gp2 = SingleTaskGP(train_X, train_Y2)
gp2.covar_module.raw_outputscale = torch.nn.Parameter(
torch.tensor([0.0], device=self.device, dtype=dtype)
)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# test HeteroskedasticSingleTaskGP
gp2 = HeteroskedasticSingleTaskGP(
train_X, train_Y1, torch.ones_like(train_Y1)
)
with self.assertRaises(NotImplementedError):
model_list_to_batched(ModelListGP(gp2))
# test custom likelihood
gp2 = SingleTaskGP(train_X, train_Y2, likelihood=GaussianLikelihood())
with self.assertRaises(NotImplementedError):
model_list_to_batched(ModelListGP(gp2))
# test FixedNoiseGP
train_X = torch.rand(10, 2, device=self.device, dtype=dtype)
train_Y1 = train_X.sum(dim=-1, keepdim=True)
train_Y2 = (train_X[:, 0] - train_X[:, 1]).unsqueeze(-1)
gp1_ = FixedNoiseGP(train_X, train_Y1, torch.rand_like(train_Y1))
gp2_ = FixedNoiseGP(train_X, train_Y2, torch.rand_like(train_Y2))
list_gp = ModelListGP(gp1_, gp2_)
batch_gp = model_list_to_batched(list_gp)
outcome_transform=octfs[0],
)
model2 = FixedNoiseGP(
train_X=train_x2,
train_Y=train_y2,
train_Yvar=train_y2_var,
outcome_transform=octfs[1],
)
else:
model1 = SingleTaskGP(
train_X=train_x1, train_Y=train_y1, outcome_transform=octfs[0]
)
model2 = SingleTaskGP(
train_X=train_x2, train_Y=train_y2, outcome_transform=octfs[1]
)
model = ModelListGP(model1, model2)
return model.to(**tkwargs)
def initialize_model(train_x, train_obj, train_con, state_dict=None):
# define models for objective and constraint
model_obj = FixedNoiseGP(train_x, train_obj, train_yvar.expand_as(train_obj)).to(train_x)
model_con = FixedNoiseGP(train_x, train_con, train_yvar.expand_as(train_con)).to(train_x)
# combine into a multi-output GP model
model = ModelListGP(model_obj, model_con)
mll = SumMarginalLogLikelihood(model.likelihood, model)
# load state dict if it is passed
if state_dict is not None:
model.load_state_dict(state_dict)
return mll, model
def test_batched_to_model_list(self):
for dtype in (torch.float, torch.double):
# test SingleTaskGP
train_X = torch.rand(10, 2, device=self.device, dtype=dtype)
train_Y1 = train_X.sum(dim=-1)
train_Y2 = train_X[:, 0] - train_X[:, 1]
train_Y = torch.stack([train_Y1, train_Y2], dim=-1)
batch_gp = SingleTaskGP(train_X, train_Y)
list_gp = batched_to_model_list(batch_gp)
self.assertIsInstance(list_gp, ModelListGP)
# test FixedNoiseGP
batch_gp = FixedNoiseGP(train_X, train_Y, torch.rand_like(train_Y))
list_gp = batched_to_model_list(batch_gp)
self.assertIsInstance(list_gp, ModelListGP)
# test HeteroskedasticSingleTaskGP
batch_gp = HeteroskedasticSingleTaskGP(
train_X, train_Y, torch.rand_like(train_Y)
)
with self.assertRaises(NotImplementedError):
batched_to_model_list(batch_gp)
gp2 = SingleTaskGP(train_X, train_Y2)
list_gp = ModelListGP(gp1, gp2)
batch_gp = model_list_to_batched(list_gp)
self.assertIsInstance(batch_gp, SingleTaskGP)
# test degenerate (single model)
batch_gp = model_list_to_batched(ModelListGP(gp1))
self.assertEqual(batch_gp._num_outputs, 1)
# test different model classes
gp2 = FixedNoiseGP(train_X, train_Y1, torch.ones_like(train_Y1))
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# test non-batched models
gp1_ = SimpleGPyTorchModel(train_X, train_Y1)
gp2_ = SimpleGPyTorchModel(train_X, train_Y2)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1_, gp2_))
# test list of multi-output models
train_Y = torch.cat([train_Y1, train_Y2], dim=-1)
gp2 = SingleTaskGP(train_X, train_Y)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# test different training inputs
gp2 = SingleTaskGP(2 * train_X, train_Y2)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# check scalar agreement
gp2 = SingleTaskGP(train_X, train_Y2)
gp2.likelihood.noise_covar.noise_prior.rate.fill_(1.0)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# check tensor shape agreement
gp2 = SingleTaskGP(train_X, train_Y2)