Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.assertIsInstance(model.likelihood, GaussianLikelihood)
self.assertIsInstance(model.mean_module, ConstantMean)
self.assertIsInstance(model.covar_module, ScaleKernel)
matern_kernel = model.covar_module.base_kernel
self.assertIsInstance(matern_kernel, MaternKernel)
self.assertIsInstance(matern_kernel.lengthscale_prior, GammaPrior)
self.assertIsInstance(model.task_covar_module, IndexKernel)
self.assertEqual(model._rank, 2)
self.assertEqual(
model.task_covar_module.covar_factor.shape[-1], model._rank
)
# test model fitting
mll = ExactMarginalLogLikelihood(model.likelihood, model)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=OptimizationWarning)
mll = fit_gpytorch_model(mll, options={"maxiter": 1}, max_retries=1)
# test posterior
test_x = torch.rand(2, 1, **tkwargs)
posterior_f = model.posterior(test_x)
self.assertIsInstance(posterior_f, GPyTorchPosterior)
self.assertIsInstance(posterior_f.mvn, MultivariateNormal)
# test posterior (batch eval)
test_x = torch.rand(3, 2, 1, **tkwargs)
posterior_f = model.posterior(test_x)
self.assertIsInstance(posterior_f, GPyTorchPosterior)
self.assertIsInstance(posterior_f.mvn, MultivariateNormal)
train_y = torch.sin(train_x * (2 * math.pi))
noise = torch.tensor(NOISE, device=self.device, dtype=dtype)
self.train_x = train_x
self.train_y = train_y + noise
if expand:
self.train_x = self.train_x.expand(-1, 2)
ics = torch.tensor([[0.5, 1.0]], device=self.device, dtype=dtype)
else:
ics = torch.tensor([[0.5]], device=self.device, dtype=dtype)
self.initial_conditions = ics
self.f_best = self.train_y.max().item()
model = SingleTaskGP(self.train_x, self.train_y)
self.model = model.to(device=self.device, dtype=dtype)
self.mll = ExactMarginalLogLikelihood(self.model.likelihood, self.model)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=OptimizationWarning)
self.mll = fit_gpytorch_model(
self.mll, options={"maxiter": 1}, max_retries=1
)
[n, n - 1, train_X.shape[-1]]
)
expected_shape_test_X = batch_shape + torch.Size([n, 1, train_X.shape[-1]])
self.assertEqual(noiseless_cv_folds.train_X.shape, expected_shape_train_X)
self.assertEqual(noiseless_cv_folds.test_X.shape, expected_shape_test_X)
expected_shape_train_Y = batch_shape + torch.Size([n, n - 1, num_outputs])
expected_shape_test_Y = batch_shape + torch.Size([n, 1, num_outputs])
self.assertEqual(noiseless_cv_folds.train_Y.shape, expected_shape_train_Y)
self.assertEqual(noiseless_cv_folds.test_Y.shape, expected_shape_test_Y)
self.assertIsNone(noiseless_cv_folds.train_Yvar)
self.assertIsNone(noiseless_cv_folds.test_Yvar)
# Test SingleTaskGP
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=OptimizationWarning)
cv_results = batch_cross_validation(
model_cls=SingleTaskGP,
mll_cls=ExactMarginalLogLikelihood,
cv_folds=noiseless_cv_folds,
fit_args={"options": {"maxiter": 1}},
)
expected_shape = batch_shape + torch.Size([n, 1, num_outputs])
self.assertEqual(cv_results.posterior.mean.shape, expected_shape)
self.assertEqual(cv_results.observed_Y.shape, expected_shape)
# Test FixedNoiseGP
noisy_cv_folds = gen_loo_cv_folds(
train_X=train_X, train_Y=train_Y, train_Yvar=train_Yvar
)
# check shapes
self.assertEqual(noisy_cv_folds.train_X.shape, expected_shape_train_X)
self.model_st.likelihood, self.model_st
)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=OptimizationWarning)
self.mll_st = fit_gpytorch_model(
self.mll_st, options={"maxiter": 5}, max_retries=1
)
model_fn = FixedNoiseGP(
self.train_x, self.train_y, self.train_yvar.expand_as(self.train_y)
)
self.model_fn = model_fn.to(device=self.device, dtype=dtype)
self.mll_fn = ExactMarginalLogLikelihood(
self.model_fn.likelihood, self.model_fn
)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=OptimizationWarning)
self.mll_fn = fit_gpytorch_model(
self.mll_fn, options={"maxiter": 5}, max_retries=1
)
def test_botorch_warnings(self):
for WarningClass in (
BotorchTensorDimensionWarning,
BotorchWarning,
BadInitialCandidatesWarning,
CostAwareWarning,
InputDataWarning,
OptimizationWarning,
SamplingWarning,
):
with warnings.catch_warnings(record=True) as ws, settings.debug(True):
warnings.warn("message", WarningClass)
self.assertEqual(len(ws), 1)
self.assertTrue(issubclass(ws[-1].category, WarningClass))
self.assertTrue("message" in str(ws[-1].message))
def test_gp(self):
for batch_shape, m, dtype, use_octf in itertools.product(
(torch.Size(), torch.Size([2])),
(1, 2),
(torch.float, torch.double),
(False, True),
):
tkwargs = {"device": self.device, "dtype": dtype}
octf = Standardize(m=m, batch_shape=batch_shape) if use_octf else None
model, _ = self._get_model_and_data(
batch_shape=batch_shape, m=m, outcome_transform=octf, **tkwargs
)
mll = ExactMarginalLogLikelihood(model.likelihood, model).to(**tkwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=OptimizationWarning)
fit_gpytorch_model(mll, options={"maxiter": 1}, max_retries=1)
# test init
self.assertIsInstance(model.mean_module, ConstantMean)
self.assertIsInstance(model.covar_module, ScaleKernel)
matern_kernel = model.covar_module.base_kernel
self.assertIsInstance(matern_kernel, MaternKernel)
self.assertIsInstance(matern_kernel.lengthscale_prior, GammaPrior)
if use_octf:
self.assertIsInstance(model.outcome_transform, Standardize)
# test param sizes
params = dict(model.named_parameters())
for p in params:
self.assertEqual(
params[p].numel(), m * torch.tensor(batch_shape).prod().item()
self.assertEqual(cv_results.posterior.mean.shape, expected_shape)
self.assertEqual(cv_results.observed_Y.shape, expected_shape)
# Test FixedNoiseGP
noisy_cv_folds = gen_loo_cv_folds(
train_X=train_X, train_Y=train_Y, train_Yvar=train_Yvar
)
# check shapes
self.assertEqual(noisy_cv_folds.train_X.shape, expected_shape_train_X)
self.assertEqual(noisy_cv_folds.test_X.shape, expected_shape_test_X)
self.assertEqual(noisy_cv_folds.train_Y.shape, expected_shape_train_Y)
self.assertEqual(noisy_cv_folds.test_Y.shape, expected_shape_test_Y)
self.assertEqual(noisy_cv_folds.train_Yvar.shape, expected_shape_train_Y)
self.assertEqual(noisy_cv_folds.test_Yvar.shape, expected_shape_test_Y)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=OptimizationWarning)
cv_results = batch_cross_validation(
model_cls=FixedNoiseGP,
mll_cls=ExactMarginalLogLikelihood,
cv_folds=noisy_cv_folds,
fit_args={"options": {"maxiter": 1}},
)
self.assertEqual(cv_results.posterior.mean.shape, expected_shape)
self.assertEqual(cv_results.observed_Y.shape, expected_shape)
self.assertEqual(cv_results.observed_Y.shape, expected_shape)
iterations.append(OptimizationIteration(i, obj, ts[i]))
# Construct info dict
info_dict = {
"fopt": float(res.fun),
"wall_time": time.time() - t1,
"iterations": iterations,
}
if not res.success:
try:
# Some res.message are bytes
msg = res.message.decode("ascii")
except AttributeError:
# Others are str
msg = res.message
warnings.warn(
f"Fitting failed with the optimizer reporting '{msg}'", OptimizationWarning
)
# Set to optimum
mll = module_from_array_func(mll, res.x, property_dict)
return mll, info_dict
mll.train()
original_state_dict = deepcopy(mll.model.state_dict())
retry = 0
while retry < max_retries:
with warnings.catch_warnings(record=True) as ws:
if retry > 0: # use normal initial conditions on first try
mll.model.load_state_dict(original_state_dict)
sample_all_priors(mll.model)
mll, _ = optimizer(mll, track_iterations=False, **kwargs)
if not any(issubclass(w.category, OptimizationWarning) for w in ws):
mll.eval()
return mll
retry += 1
logging.log(logging.DEBUG, f"Fitting failed on try {retry}.")
warnings.warn("Fitting failed on all retries.", OptimizationWarning)
return mll.eval()
except (NotImplementedError, UnsupportedError, RuntimeError, AttributeError):
warnings.warn(FAILED_CONVERSION_MSG, BotorchWarning)
return fit_gpytorch_model(
mll=mll, optimizer=optimizer, sequential=False, max_retries=max_retries
)
# retry with random samples from the priors upon failure
mll.train()
original_state_dict = deepcopy(mll.model.state_dict())
retry = 0
while retry < max_retries:
with warnings.catch_warnings(record=True) as ws:
if retry > 0: # use normal initial conditions on first try
mll.model.load_state_dict(original_state_dict)
sample_all_priors(mll.model)
mll, _ = optimizer(mll, track_iterations=False, **kwargs)
if not any(issubclass(w.category, OptimizationWarning) for w in ws):
mll.eval()
return mll
retry += 1
logging.log(logging.DEBUG, f"Fitting failed on try {retry}.")
warnings.warn("Fitting failed on all retries.", OptimizationWarning)
return mll.eval()