Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_fit_model_and_get_best_point(
self, mock_get_fitted_model, mock_greedy, cuda=False
):
objective = IdentityMCObjective()
device = torch.device("cuda") if cuda else torch.device("cpu")
for dtype in (torch.float, torch.double):
train_X = torch.rand((5, 2), dtype=dtype, device=device)
train_Y = torch.rand(5, dtype=dtype, device=device)
train_Yvar = (torch.rand(5, dtype=dtype, device=device)) ** 2
model = SingleTaskGP(train_X=train_X, train_Y=train_Y)
mock_get_fitted_model.return_value = model
exp_best_point = train_X[0]
exp_obj = torch.tensor(2.0, dtype=dtype, device=device)
exp_feas = torch.tensor(1.0, dtype=dtype, device=device)
mock_greedy.return_value = (exp_best_point, exp_obj, exp_feas)
model_fit_options = {"maxiter": 1}
model_and_best_point_output = _fit_model_and_get_best_point(
train_X=train_X,
train_Y=train_Y,
train_Yvar=train_Yvar,
model=model,
max_retries=0,
model_fit_options={"maxiter": 1},
warm_start=False,
verbose=False,
objective=objective,
train_X=train_x1,
train_Y=train_y1,
train_Yvar=train_y1_var,
outcome_transform=octfs[0],
)
model2 = FixedNoiseGP(
train_X=train_x2,
train_Y=train_y2,
train_Yvar=train_y2_var,
outcome_transform=octfs[1],
)
else:
model1 = SingleTaskGP(
train_X=train_x1, train_Y=train_y1, outcome_transform=octfs[0]
)
model2 = SingleTaskGP(
train_X=train_x2, train_Y=train_y2, outcome_transform=octfs[1]
)
model = ModelListGP(model1, model2)
return model.to(**tkwargs)
def _get_model(n, fixed_noise=False, **tkwargs):
train_x1, train_x2, train_y1, train_y2 = _get_random_data(n=n, **tkwargs)
if fixed_noise:
train_y1_var = 0.1 + 0.1 * torch.rand_like(train_y1, **tkwargs)
train_y2_var = 0.1 + 0.1 * torch.rand_like(train_y2, **tkwargs)
model1 = FixedNoiseGP(
train_X=train_x1, train_Y=train_y1, train_Yvar=train_y1_var
)
model2 = FixedNoiseGP(
train_X=train_x2, train_Y=train_y2, train_Yvar=train_y2_var
)
else:
model1 = SingleTaskGP(train_X=train_x1, train_Y=train_y1)
model2 = SingleTaskGP(train_X=train_x2, train_Y=train_y2)
model = ModelListGP(model1, model2)
return model.to(**tkwargs)
def test_ModelListGP_single(self):
tkwargs = {"device": self.device, "dtype": torch.float}
train_x1, train_y1 = _get_random_data(
batch_shape=torch.Size(), num_outputs=1, n=10, **tkwargs
)
train_x2, train_y2 = _get_random_data(
batch_shape=torch.Size(), num_outputs=1, n=11, **tkwargs
)
model1 = SingleTaskGP(train_X=train_x1, train_Y=train_y1)
model = ModelListGP(model1)
model.to(**tkwargs)
test_x = torch.tensor([[0.25], [0.75]], **tkwargs)
posterior = model.posterior(test_x)
self.assertIsInstance(posterior, GPyTorchPosterior)
self.assertIsInstance(posterior.mvn, MultivariateNormal)
def _get_model_and_data(self, batch_shape, m, outcome_transform=None, **tkwargs):
train_X, train_Y = _get_random_data(
batch_shape=batch_shape, num_outputs=m, **tkwargs
)
model_kwargs = {"train_X": train_X, "train_Y": train_Y}
if outcome_transform is not None:
model_kwargs["outcome_transform"] = outcome_transform
model = SingleTaskGP(**model_kwargs)
return model, model_kwargs
train_y1_var = 0.1 + 0.1 * torch.rand_like(train_y1, **tkwargs)
train_y2_var = 0.1 + 0.1 * torch.rand_like(train_y2, **tkwargs)
model1 = FixedNoiseGP(
train_X=train_x1,
train_Y=train_y1,
train_Yvar=train_y1_var,
outcome_transform=octfs[0],
)
model2 = FixedNoiseGP(
train_X=train_x2,
train_Y=train_y2,
train_Yvar=train_y2_var,
outcome_transform=octfs[1],
)
else:
model1 = SingleTaskGP(
train_X=train_x1, train_Y=train_y1, outcome_transform=octfs[0]
)
model2 = SingleTaskGP(
train_X=train_x2, train_Y=train_y2, outcome_transform=octfs[1]
)
model = ModelListGP(model1, model2)
return model.to(**tkwargs)
def test_ModelListGP_single(self):
tkwargs = {"device": self.device, "dtype": torch.float}
train_x1, train_x2, train_y1, train_y2 = _get_random_data(n=10, **tkwargs)
model1 = SingleTaskGP(train_X=train_x1, train_Y=train_y1)
model = ModelListGP(model1)
model.to(**tkwargs)
test_x = torch.tensor([[0.25], [0.75]], **tkwargs)
posterior = model.posterior(test_x)
self.assertIsInstance(posterior, GPyTorchPosterior)
self.assertIsInstance(posterior.mvn, MultivariateNormal)
def test_get_fitted_model(self, mock_fit_model, cuda=False):
device = torch.device("cuda") if cuda else torch.device("cpu")
for dtype in (torch.float, torch.double):
init_X = torch.rand((5, 2), dtype=dtype, device=device)
init_Y = torch.rand(5, dtype=dtype, device=device)
init_Yvar = (torch.rand(5, dtype=dtype, device=device)) ** 2
initial_model = SingleTaskGP(train_X=init_X, train_Y=init_Y)
train_X = torch.rand((5, 2), dtype=dtype, device=device)
train_Y = torch.rand(5, dtype=dtype, device=device)
train_Yvar = (torch.rand(5, dtype=dtype, device=device)) ** 2
model = _get_fitted_model(
train_X=train_X,
train_Y=train_Y,
train_Yvar=None,
model=initial_model,
options={"maxiter": 1},
warm_start=False,
)
self.assertIsInstance(model, SingleTaskGP)
self.assertIsInstance(model.likelihood, GaussianLikelihood)
self.assertTrue(torch.equal(model.train_inputs[0], train_X))
self.assertTrue(torch.equal(model.train_targets, train_Y))
initial_model2 = HeteroskedasticSingleTaskGP(
init_X = torch.rand((5, 2), dtype=dtype, device=device)
init_Y = torch.rand(5, dtype=dtype, device=device)
init_Yvar = (torch.rand(5, dtype=dtype, device=device)) ** 2
initial_model = SingleTaskGP(train_X=init_X, train_Y=init_Y)
train_X = torch.rand((5, 2), dtype=dtype, device=device)
train_Y = torch.rand(5, dtype=dtype, device=device)
train_Yvar = (torch.rand(5, dtype=dtype, device=device)) ** 2
model = _get_fitted_model(
train_X=train_X,
train_Y=train_Y,
train_Yvar=None,
model=initial_model,
options={"maxiter": 1},
warm_start=False,
)
self.assertIsInstance(model, SingleTaskGP)
self.assertIsInstance(model.likelihood, GaussianLikelihood)
self.assertTrue(torch.equal(model.train_inputs[0], train_X))
self.assertTrue(torch.equal(model.train_targets, train_Y))
initial_model2 = HeteroskedasticSingleTaskGP(
train_X=init_X, train_Y=init_Y, train_Yvar=init_Yvar
)
model2 = _get_fitted_model(
train_X=train_X,
train_Y=train_Y,
train_Yvar=train_Yvar,
model=initial_model2,
options={"maxiter": 1},
warm_start=False,
)
self.assertIsInstance(model2, HeteroskedasticSingleTaskGP)
self.assertIsInstance(model2.likelihood, _GaussianLikelihoodBase)
>>> train_Yvar = 0.1 + se * torch.rand_like(train_Y)
>>> model = HeteroskedasticSingleTaskGP(train_X, train_Y, train_Yvar)
"""
if outcome_transform is not None:
train_Y, train_Yvar = outcome_transform(train_Y, train_Yvar)
validate_input_scaling(train_X=train_X, train_Y=train_Y, train_Yvar=train_Yvar)
self._validate_tensor_args(X=train_X, Y=train_Y, Yvar=train_Yvar)
self._set_dimensions(train_X=train_X, train_Y=train_Y)
noise_likelihood = GaussianLikelihood(
noise_prior=SmoothedBoxPrior(-3, 5, 0.5, transform=torch.log),
batch_shape=self._aug_batch_shape,
noise_constraint=GreaterThan(
MIN_INFERRED_NOISE_LEVEL, transform=None, initial_value=1.0
),
)
noise_model = SingleTaskGP(
train_X=train_X,
train_Y=train_Yvar,
likelihood=noise_likelihood,
outcome_transform=Log(),
)
likelihood = _GaussianLikelihoodBase(HeteroskedasticNoise(noise_model))
super().__init__(train_X=train_X, train_Y=train_Y, likelihood=likelihood)
self.register_added_loss_term("noise_added_loss")
self.update_added_loss_term(
"noise_added_loss", NoiseModelAddedLossTerm(noise_model)
)
if outcome_transform is not None:
self.outcome_transform = outcome_transform
self.to(train_X)