Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_check_standardization(self):
Y = torch.randn(3, 4, 2)
# check standardized input
Yst = (Y - Y.mean(dim=-2, keepdim=True)) / Y.std(dim=-2, keepdim=True)
with settings.debug(True):
with warnings.catch_warnings(record=True) as ws:
check_standardization(Y=Yst)
self.assertFalse(
any(issubclass(w.category, InputDataWarning) for w in ws)
)
check_standardization(Y=Yst, raise_on_fail=True)
# check nonzero mean
with warnings.catch_warnings(record=True) as ws:
check_standardization(Y=Yst + 1)
self.assertTrue(
any(issubclass(w.category, InputDataWarning) for w in ws)
)
self.assertTrue(any("not standardized" in str(w.message) for w in ws))
with self.assertRaises(InputDataError):
check_standardization(Y=Yst + 1, raise_on_fail=True)
# check non-unit variance
validate_input_scaling(train_X=train_X, train_Y=train_Y)
self.assertTrue(any(issubclass(w.category, InputDataWarning) for w in ws))
# check that errors are raised when requested
with settings.debug(True):
with self.assertRaises(InputDataError):
validate_input_scaling(
train_X=train_X, train_Y=train_Y, raise_on_fail=True
)
# check that no errors are being raised if everything is standardized
train_X_min = train_X.min(dim=-1, keepdim=True)[0]
train_X_max = train_X.max(dim=-1, keepdim=True)[0]
train_X_std = (train_X - train_X_min) / (train_X_max - train_X_min)
train_Y_std = (train_Y - train_Y.mean(dim=-2, keepdim=True)) / train_Y.std(
dim=-2, keepdim=True
)
with settings.debug(True), warnings.catch_warnings(record=True) as ws:
validate_input_scaling(train_X=train_X_std, train_Y=train_Y_std)
self.assertFalse(any(issubclass(w.category, InputDataWarning) for w in ws))
# test that negative variances raise an error
train_Yvar = torch.rand_like(train_Y_std)
train_Yvar[0, 0, 1] = -0.5
with settings.debug(True):
with self.assertRaises(InputDataError):
validate_input_scaling(
train_X=train_X_std, train_Y=train_Y_std, train_Yvar=train_Yvar
)
# check that NaNs raise errors
train_X_std[0, 0, 0] = float("nan")
with settings.debug(True):
with self.assertRaises(InputDataError):
validate_input_scaling(train_X=train_X_std, train_Y=train_Y_std)
# turn on BotorchWarning
settings.debug._set_state(True)
# check that warnings are suppressed
with settings.debug(False):
with warnings.catch_warnings(record=True) as ws:
warnings.warn("test", BotorchWarning)
self.assertEqual(len(ws), 0)
# check that warnings are not suppressed outside of context manager
with warnings.catch_warnings(record=True) as ws:
warnings.warn("test", BotorchWarning)
self.assertEqual(len(ws), 1)
# turn off BotorchWarnings
settings.debug._set_state(False)
# check that warnings are not suppressed
with settings.debug(True):
with warnings.catch_warnings(record=True) as ws:
warnings.warn("test", BotorchWarning)
self.assertEqual(len(ws), 1)
# check that warnings are suppressed outside of context manager
with warnings.catch_warnings(record=True) as ws:
warnings.warn("test", BotorchWarning)
self.assertEqual(len(ws), 0)
samples_noisy_pending = samples_noisy_pending.view(1, 3, 1)
mm_noisy_pending = MockModel(MockPosterior(samples=samples_noisy_pending))
acqf = qNoisyExpectedImprovement(
model=mm_noisy_pending, X_baseline=X_baseline, sampler=sampler
)
acqf.set_X_pending()
self.assertIsNone(acqf.X_pending)
acqf.set_X_pending(None)
self.assertIsNone(acqf.X_pending)
acqf.set_X_pending(X)
self.assertEqual(acqf.X_pending, X)
res = acqf(X)
X2 = torch.zeros(
1, 1, 1, device=self.device, dtype=dtype, requires_grad=True
)
with warnings.catch_warnings(record=True) as ws, settings.debug(True):
acqf.set_X_pending(X2)
self.assertEqual(acqf.X_pending, X2)
self.assertEqual(len(ws), 1)
self.assertTrue(issubclass(ws[-1].category, BotorchWarning))
sample_shape = torch.Size(tshape["sample"])
expected_shape = sample_shape + batch_shape + output_shape
samples = construct_base_samples(
batch_shape=batch_shape,
output_shape=output_shape,
sample_shape=sample_shape,
qmc=qmc,
seed=seed,
device=self.device,
dtype=dtype,
)
self.assertEqual(samples.shape, expected_shape)
self.assertEqual(samples.device.type, self.device.type)
self.assertEqual(samples.dtype, dtype)
# check that warning is issued if dimensionality is too large
with warnings.catch_warnings(record=True) as w, settings.debug(True):
construct_base_samples(
batch_shape=torch.Size(),
output_shape=torch.Size([200, 6]),
sample_shape=torch.Size([1]),
qmc=True,
)
self.assertEqual(len(w), 1)
self.assertTrue(issubclass(w[-1].category, SamplingWarning))
exp_str = f"maximum supported by qmc ({SobolEngine.MAXDIM})"
self.assertTrue(exp_str in str(w[-1].message))
self.assertTrue("wall_time" in info_dict)
# test extra param that does not affect loss
options["disp"] = False
mll = self._getModel(double=double)
mll.register_parameter(
"dummy_param",
torch.nn.Parameter(
torch.tensor(
[5.0],
dtype=torch.double if double else torch.float,
device=self.device,
)
),
)
with warnings.catch_warnings(record=True) as ws, settings.debug(True):
mll = fit_gpytorch_model(
mll, optimizer=optimizer, options=options, max_retries=1
)
if optimizer == fit_gpytorch_scipy:
self.assertEqual(len(ws), 1)
self.assertTrue(MAX_RETRY_MSG in str(ws[0].message))
self.assertTrue(mll.dummy_param.grad is None)
# test excluding a parameter
mll = self._getModel(double=double)
original_raw_noise = mll.model.likelihood.noise_covar.raw_noise.item()
original_mean_module_constant = mll.model.mean_module.constant.item()
options["exclude"] = [
"model.mean_module.constant",
"likelihood.noise_covar.raw_noise",
]
def test_fit_gpytorch_model_singular(self):
options = {"disp": False, "maxiter": 5}
for dtype in (torch.float, torch.double):
X_train = torch.rand(2, 2, device=self.device, dtype=dtype)
Y_train = torch.zeros(2, 1, device=self.device, dtype=dtype)
test_likelihood = GaussianLikelihood(
noise_constraint=GreaterThan(-1.0, transform=None, initial_value=0.0)
)
gp = SingleTaskGP(X_train, Y_train, likelihood=test_likelihood)
mll = ExactMarginalLogLikelihood(gp.likelihood, gp)
mll.to(device=self.device, dtype=dtype)
# this will do multiple retries (and emit warnings, which is desired)
with warnings.catch_warnings(record=True) as ws, settings.debug(True):
fit_gpytorch_model(mll, options=options, max_retries=2)
self.assertTrue(
any(issubclass(w.category, OptimizationWarning) for w in ws)
)
def test_gen_batch_initial_conditions_highdim(self):
d = 120
bounds = torch.stack([torch.zeros(d), torch.ones(d)])
for dtype in (torch.float, torch.double):
bounds = bounds.to(device=self.device, dtype=dtype)
for nonnegative in (True, False):
for seed in (None, 1234):
with warnings.catch_warnings(record=True) as ws, settings.debug(
True
):
batch_initial_conditions = gen_batch_initial_conditions(
acq_function=MockAcquisitionFunction(),
bounds=bounds,
q=10,
num_restarts=1,
raw_samples=2,
options={
"nonnegative": nonnegative,
"eta": 0.01,
"alpha": 0.1,
"seed": seed,
},
)
self.assertTrue(
def test_botorch_warnings(self):
for WarningClass in (
BotorchTensorDimensionWarning,
BotorchWarning,
BadInitialCandidatesWarning,
CostAwareWarning,
InputDataWarning,
OptimizationWarning,
SamplingWarning,
):
with warnings.catch_warnings(record=True) as ws, settings.debug(True):
warnings.warn("message", WarningClass)
self.assertEqual(len(ws), 1)
self.assertTrue(issubclass(ws[-1].category, WarningClass))
self.assertTrue("message" in str(ws[-1].message))