Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ls = model.covar_module.base_kernel.raw_lengthscale.view(-1).tolist()
self.assertTrue(all(ls[0] != ls[i]) for i in range(1, len(ls)))
# change one of the priors to SmoothedBoxPrior
model.covar_module = ScaleKernel(
MaternKernel(
nu=2.5,
ard_num_dims=model.train_inputs[0].shape[-1],
batch_shape=model._aug_batch_shape,
lengthscale_prior=SmoothedBoxPrior(3.0, 6.0),
),
batch_shape=model._aug_batch_shape,
outputscale_prior=GammaPrior(2.0, 0.15),
)
original_state_dict = dict(deepcopy(mll.model.state_dict()))
with warnings.catch_warnings(record=True) as ws, settings.debug(True):
sample_all_priors(model)
self.assertEqual(len(ws), 1)
self.assertTrue("rsample" in str(ws[0].message))
# the lengthscale should not have changed because sampling is
# not implemented for SmoothedBoxPrior
self.assertTrue(
torch.equal(
dict(model.state_dict())[
"covar_module.base_kernel.raw_lengthscale"
],
original_state_dict["covar_module.base_kernel.raw_lengthscale"],
)
)
# set setting_closure to None and make sure RuntimeError is raised
def test_gpt_posterior_settings(self):
for propagate_grads in (False, True):
with settings.propagate_grads(propagate_grads):
with gpt_posterior_settings():
self.assertTrue(gpt_settings.debug.off())
self.assertTrue(gpt_settings.fast_pred_var.on())
if settings.propagate_grads.off():
self.assertTrue(gpt_settings.detach_test_caches.on())
else:
self.assertTrue(gpt_settings.detach_test_caches.off())
def test_fit_gpytorch_model(self, optimizer=fit_gpytorch_scipy):
options = {"disp": False, "maxiter": 5}
for double in (False, True):
mll = self._getModel(double=double)
with warnings.catch_warnings(record=True) as ws, settings.debug(True):
mll = fit_gpytorch_model(
mll, optimizer=optimizer, options=options, max_retries=1
)
if optimizer == fit_gpytorch_scipy:
self.assertEqual(len(ws), 1)
self.assertTrue(MAX_RETRY_MSG in str(ws[0].message))
model = mll.model
# Make sure all of the parameters changed
self.assertGreater(model.likelihood.raw_noise.abs().item(), 1e-3)
self.assertLess(model.mean_module.constant.abs().item(), 0.1)
self.assertGreater(
model.covar_module.base_kernel.raw_lengthscale.abs().item(), 0.1
)
self.assertGreater(model.covar_module.raw_outputscale.abs().item(), 1e-3)
# test overriding the default bounds with user supplied bounds
def test_validate_input_scaling(self):
train_X = 2 + torch.rand(3, 4, 3)
train_Y = torch.randn(3, 4, 2)
# check that nothing is being checked
with settings.validate_input_scaling(False), settings.debug(True):
with warnings.catch_warnings(record=True) as ws:
validate_input_scaling(train_X=train_X, train_Y=train_Y)
self.assertFalse(
any(issubclass(w.category, InputDataWarning) for w in ws)
)
# check that warnings are being issued
with settings.debug(True), warnings.catch_warnings(record=True) as ws:
validate_input_scaling(train_X=train_X, train_Y=train_Y)
self.assertTrue(any(issubclass(w.category, InputDataWarning) for w in ws))
# check that errors are raised when requested
with settings.debug(True):
with self.assertRaises(InputDataError):
validate_input_scaling(
train_X=train_X, train_Y=train_Y, raise_on_fail=True
)
# check that no errors are being raised if everything is standardized
def test_flags(self):
for flag in (settings.debug, settings.propagate_grads):
self.assertFalse(flag.on())
self.assertTrue(flag.off())
with flag(True):
self.assertTrue(flag.on())
self.assertFalse(flag.off())
self.assertFalse(flag.on())
self.assertTrue(flag.off())
bs = acqf.sampler.base_samples.clone()
acqf(X)
self.assertFalse(torch.equal(acqf.sampler.base_samples, bs))
# basic test for X_pending and warning
acqf.set_X_pending()
self.assertIsNone(acqf.X_pending)
acqf.set_X_pending(None)
self.assertIsNone(acqf.X_pending)
acqf.set_X_pending(X)
self.assertEqual(acqf.X_pending, X)
res = acqf(X)
X2 = torch.zeros(
1, 1, 1, device=self.device, dtype=dtype, requires_grad=True
)
with warnings.catch_warnings(record=True) as ws, settings.debug(True):
acqf.set_X_pending(X2)
self.assertEqual(acqf.X_pending, X2)
self.assertEqual(len(ws), 1)
self.assertTrue(issubclass(ws[-1].category, BotorchWarning))
def test_gpt_posterior_settings(self):
for propagate_grads in (False, True):
with settings.propagate_grads(propagate_grads):
with gpt_posterior_settings():
self.assertTrue(gpt_settings.debug.off())
self.assertTrue(gpt_settings.fast_pred_var.on())
if settings.propagate_grads.off():
self.assertTrue(gpt_settings.detach_test_caches.on())
else:
self.assertTrue(gpt_settings.detach_test_caches.off())
def test_validate_tensor_args(self):
n, d = 3, 2
for batch_shape, output_dim_shape, dtype in itertools.product(
(torch.Size(), torch.Size([2])),
(torch.Size(), torch.Size([1]), torch.Size([2])),
(torch.float, torch.double),
):
tkwargs = {"device": self.device, "dtype": dtype}
X = torch.empty(batch_shape + torch.Size([n, d]), **tkwargs)
# test using the same batch_shape as X
Y = torch.empty(batch_shape + torch.Size([n]) + output_dim_shape, **tkwargs)
if len(output_dim_shape) > 0:
# check that no exception is raised
GPyTorchModel._validate_tensor_args(X, Y)
with settings.debug(True), self.assertWarns(
BotorchTensorDimensionWarning
):
GPyTorchModel._validate_tensor_args(X, Y, strict=False)
else:
with self.assertRaises(BotorchTensorDimensionError):
GPyTorchModel._validate_tensor_args(X, Y)
with settings.debug(True), self.assertWarns(
BotorchTensorDimensionWarning
):
GPyTorchModel._validate_tensor_args(X, Y, strict=False)
# test using different batch_shape
if len(batch_shape) > 0:
with self.assertRaises(BotorchTensorDimensionError):
GPyTorchModel._validate_tensor_args(X, Y[0])
with settings.debug(True), self.assertWarns(
BotorchTensorDimensionWarning
init_kwargs = {}
device = bounds.device
bounds = bounds.cpu()
if "eta" in options:
init_kwargs["eta"] = options.get("eta")
if options.get("nonnegative") or is_nonnegative(acq_function):
init_func = initialize_q_batch_nonneg
if "alpha" in options:
init_kwargs["alpha"] = options.get("alpha")
else:
init_func = initialize_q_batch
q = 1 if q is None else q
# the dimension the samples are drawn from
dim = bounds.shape[-1] * q
if dim > SobolEngine.MAXDIM and settings.debug.on():
warnings.warn(
f"Sample dimension q*d={dim} exceeding Sobol max dimension "
f"({SobolEngine.MAXDIM}). Using iid samples instead.",
SamplingWarning,
)
while factor < max_factor:
with warnings.catch_warnings(record=True) as ws:
n = raw_samples * factor
if dim <= SobolEngine.MAXDIM:
X_rnd = draw_sobol_samples(bounds=bounds, n=n, q=q, seed=seed)
else:
with manual_seed(seed):
# load on cpu
X_rnd_nlzd = torch.rand(n * dim, dtype=bounds.dtype).view(
n, q, bounds.shape[-1]
observations.
(3) condition the model on the new fake observations.
Args:
X: A `batch_shape x n' x d`-dim Tensor, where `d` is the dimension of
the feature space, `n'` is the number of points per batch, and
`batch_shape` is the batch shape (must be compatible with the
batch shape of the model).
sampler: The sampler used for sampling from the posterior at `X`.
observation_noise: If True, include observation noise.
Returns:
The constructed fantasy model.
"""
propagate_grads = kwargs.pop("propagate_grads", False)
with settings.propagate_grads(propagate_grads):
post_X = self.posterior(X, observation_noise=observation_noise)
Y_fantasized = sampler(post_X) # num_fantasies x batch_shape x n' x m
return self.condition_on_observations(X=X, Y=Y_fantasized, **kwargs)