Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _getBatchedModel(self, kind="SingleTaskGP", double=False):
dtype = torch.double if double else torch.float
train_x = torch.linspace(0, 1, 10, device=self.device, dtype=dtype).unsqueeze(
-1
)
noise = torch.tensor(NOISE, device=self.device, dtype=dtype)
train_y1 = torch.sin(train_x * (2 * math.pi)) + noise
train_y2 = torch.sin(train_x * (2 * math.pi)) + noise
train_y = torch.cat([train_y1, train_y2], dim=-1)
if kind == "SingleTaskGP":
model = SingleTaskGP(train_x, train_y)
elif kind == "FixedNoiseGP":
model = FixedNoiseGP(train_x, train_y, 0.1 * torch.ones_like(train_y))
elif kind == "HeteroskedasticSingleTaskGP":
model = HeteroskedasticSingleTaskGP(
train_x, train_y, 0.1 * torch.ones_like(train_y)
)
else:
raise NotImplementedError
mll = ExactMarginalLogLikelihood(model.likelihood, model)
return mll.to(device=self.device, dtype=dtype)
def _setUp(self, double=False):
dtype = torch.double if double else torch.float
train_x = torch.linspace(0, 1, 10, device=self.device, dtype=dtype).unsqueeze(
-1
)
train_y = torch.sin(train_x * (2 * math.pi))
train_yvar = torch.tensor(0.1 ** 2, device=self.device)
noise = torch.tensor(NOISE, device=self.device, dtype=dtype)
self.train_x = train_x
self.train_y = train_y + noise
self.train_yvar = train_yvar
self.bounds = torch.tensor([[0.0], [1.0]], device=self.device, dtype=dtype)
model_st = SingleTaskGP(self.train_x, self.train_y)
self.model_st = model_st.to(device=self.device, dtype=dtype)
self.mll_st = ExactMarginalLogLikelihood(
self.model_st.likelihood, self.model_st
)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=OptimizationWarning)
self.mll_st = fit_gpytorch_model(
self.mll_st, options={"maxiter": 5}, max_retries=1
)
model_fn = FixedNoiseGP(
self.train_x, self.train_y, self.train_yvar.expand_as(self.train_y)
)
self.model_fn = model_fn.to(device=self.device, dtype=dtype)
self.mll_fn = ExactMarginalLogLikelihood(
self.model_fn.likelihood, self.model_fn
)
def get_fitted_model(train_X: Tensor, train_Y: Tensor) -> SingleTaskGP:
"""
Fit SingleTaskGP with torch.optim.Adam.
"""
model = SingleTaskGP(train_X, train_Y)
mll = ExactMarginalLogLikelihood(model.likelihood, model).to(train_X)
fit_gpytorch_model(mll, optimizer=fit_gpytorch_torch, options={"disp": False})
return model
def test_model_list_to_batched(self):
for dtype in (torch.float, torch.double):
# basic test
train_X = torch.rand(10, 2, device=self.device, dtype=dtype)
train_Y1 = train_X.sum(dim=-1, keepdim=True)
train_Y2 = (train_X[:, 0] - train_X[:, 1]).unsqueeze(-1)
gp1 = SingleTaskGP(train_X, train_Y1)
gp2 = SingleTaskGP(train_X, train_Y2)
list_gp = ModelListGP(gp1, gp2)
batch_gp = model_list_to_batched(list_gp)
self.assertIsInstance(batch_gp, SingleTaskGP)
# test degenerate (single model)
batch_gp = model_list_to_batched(ModelListGP(gp1))
self.assertEqual(batch_gp._num_outputs, 1)
# test different model classes
gp2 = FixedNoiseGP(train_X, train_Y1, torch.ones_like(train_Y1))
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# test non-batched models
gp1_ = SimpleGPyTorchModel(train_X, train_Y1)
gp2_ = SimpleGPyTorchModel(train_X, train_Y2)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1_, gp2_))
# test different model classes
gp2 = FixedNoiseGP(train_X, train_Y1, torch.ones_like(train_Y1))
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# test non-batched models
gp1_ = SimpleGPyTorchModel(train_X, train_Y1)
gp2_ = SimpleGPyTorchModel(train_X, train_Y2)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1_, gp2_))
# test list of multi-output models
train_Y = torch.cat([train_Y1, train_Y2], dim=-1)
gp2 = SingleTaskGP(train_X, train_Y)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# test different training inputs
gp2 = SingleTaskGP(2 * train_X, train_Y2)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# check scalar agreement
gp2 = SingleTaskGP(train_X, train_Y2)
gp2.likelihood.noise_covar.noise_prior.rate.fill_(1.0)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# check tensor shape agreement
gp2 = SingleTaskGP(train_X, train_Y2)
gp2.covar_module.raw_outputscale = torch.nn.Parameter(
torch.tensor([0.0], device=self.device, dtype=dtype)
)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# test HeteroskedasticSingleTaskGP
gp2 = HeteroskedasticSingleTaskGP(
def test_get_extra_mll_args(self):
train_X = torch.rand(3, 5)
train_Y = torch.rand(3, 1)
model = SingleTaskGP(train_X=train_X, train_Y=train_Y)
# test ExactMarginalLogLikelihood
exact_mll = ExactMarginalLogLikelihood(model.likelihood, model)
exact_extra_args = _get_extra_mll_args(mll=exact_mll)
self.assertEqual(len(exact_extra_args), 1)
self.assertTrue(torch.equal(exact_extra_args[0], train_X))
# test SumMarginalLogLikelihood
model2 = ModelListGP(model)
sum_mll = SumMarginalLogLikelihood(model2.likelihood, model2)
sum_mll_extra_args = _get_extra_mll_args(mll=sum_mll)
self.assertEqual(len(sum_mll_extra_args), 1)
self.assertEqual(len(sum_mll_extra_args[0]), 1)
self.assertTrue(torch.equal(sum_mll_extra_args[0][0], train_X))
# test unsupported MarginalLogLikelihood type
# test non-batched models
gp1_ = SimpleGPyTorchModel(train_X, train_Y1)
gp2_ = SimpleGPyTorchModel(train_X, train_Y2)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1_, gp2_))
# test list of multi-output models
train_Y = torch.cat([train_Y1, train_Y2], dim=-1)
gp2 = SingleTaskGP(train_X, train_Y)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# test different training inputs
gp2 = SingleTaskGP(2 * train_X, train_Y2)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# check scalar agreement
gp2 = SingleTaskGP(train_X, train_Y2)
gp2.likelihood.noise_covar.noise_prior.rate.fill_(1.0)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# check tensor shape agreement
gp2 = SingleTaskGP(train_X, train_Y2)
gp2.covar_module.raw_outputscale = torch.nn.Parameter(
torch.tensor([0.0], device=self.device, dtype=dtype)
)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# test HeteroskedasticSingleTaskGP
gp2 = HeteroskedasticSingleTaskGP(
train_X, train_Y1, torch.ones_like(train_Y1)
)
with self.assertRaises(NotImplementedError):
model_list_to_batched(ModelListGP(gp2))
from botorch.test_functions import Branin
from botorch.fit import fit_gpytorch_model
from botorch.models import SingleTaskGP
from botorch.utils.transforms import standardize, normalize
from gpytorch.mlls import ExactMarginalLogLikelihood
torch.manual_seed(7)
bounds = torch.tensor(Branin._bounds).T
train_X = bounds[0] + (bounds[1] - bounds[0]) * torch.rand(10, 2)
train_Y = Branin(negate=True)(train_X).unsqueeze(-1)
train_X = normalize(train_X, bounds=bounds)
train_Y = standardize(train_Y + 0.05 * torch.randn_like(train_Y))
model = SingleTaskGP(train_X, train_Y)
mll = ExactMarginalLogLikelihood(model.likelihood, model)
fit_gpytorch_model(mll);
# ### 3. Defining the MES acquisition function
#
# The `qMaxValueEntropy` acquisition function is a subclass of `MCAcquisitionFunction` and supports pending points `X_pending`. Required arguments for the constructor are `model` and `candidate_set` (the discretized candidate points in the design space that will be used to draw max value samples). There are also other optional parameters, such as number of max value samples $\mathcal{F^*}$, number of $\mathcal{Y}$ samples and number of fantasies (in case of $q>1$). Two different sampling algorithms are supported for the max value samples: the discretized Thompson sampling and the Gumbel sampling introduced in [2]. Gumbel sampling is the default choice in the acquisition function.
# In[2]:
from botorch.acquisition.max_value_entropy_search import qMaxValueEntropy
candidate_set = torch.rand(1000, bounds.size(1), device=bounds.device, dtype=bounds.dtype)
candidate_set = bounds[0] + (bounds[1] - bounds[0]) * candidate_set
qMES = qMaxValueEntropy(model, candidate_set)
def initialize_model(n=5):
# generate training data
train_x = (bounds[1] - bounds[0]) * torch.rand(n, 20, device=device, dtype=dtype) + bounds[0]
train_obj = score_image_recognition(decode(train_x))
best_observed_value = train_obj.max().item()
# define models for objective and constraint
model = SingleTaskGP(train_X=train_x, train_Y=train_obj)
model = model.to(train_x)
mll = ExactMarginalLogLikelihood(model.likelihood, model)
mll = mll.to(train_x)
return train_x, train_obj, mll, model, best_observed_value