Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for dtype in (torch.float, torch.double):
samples = torch.zeros(2, 2, 1, device=self.device, dtype=dtype)
samples[0, 0, 0] = 1.0
mm = MockModel(MockPosterior(samples=samples))
# X is a dummy and unused b/c of mocking
X = torch.zeros(1, 1, 1, device=self.device, dtype=dtype)
# test batch mode
sampler = IIDNormalSampler(num_samples=2)
acqf = qUpperConfidenceBound(model=mm, beta=0.5, sampler=sampler)
res = acqf(X)
self.assertEqual(res[0].item(), 1.0)
self.assertEqual(res[1].item(), 0.0)
# test batch mode, no resample
sampler = IIDNormalSampler(num_samples=2, seed=12345)
acqf = qUpperConfidenceBound(model=mm, beta=0.5, sampler=sampler)
res = acqf(X) # 1-dim batch
self.assertEqual(res[0].item(), 1.0)
self.assertEqual(res[1].item(), 0.0)
self.assertEqual(acqf.sampler.base_samples.shape, torch.Size([2, 1, 2, 1]))
bs = acqf.sampler.base_samples.clone()
acqf(X)
self.assertTrue(torch.equal(acqf.sampler.base_samples, bs))
res = acqf(X.expand(2, 1, 1)) # 2-dim batch
self.assertEqual(res[0].item(), 1.0)
self.assertEqual(res[1].item(), 0.0)
# the base samples should have the batch dim collapsed
self.assertEqual(acqf.sampler.base_samples.shape, torch.Size([2, 1, 2, 1]))
bs = acqf.sampler.base_samples.clone()
acqf(X.expand(2, 1, 1))
self.assertTrue(torch.equal(acqf.sampler.base_samples, bs))
# X_baseline is `q' x d` = 1 x 1
X_baseline = torch.zeros(1, 1, device=self.device, dtype=dtype)
mm_noisy = MockModel(MockPosterior(samples=samples_noisy))
# X is `q x d` = 1 x 1
X = torch.zeros(1, 1, device=self.device, dtype=dtype)
# basic test
sampler = IIDNormalSampler(num_samples=2)
acqf = qNoisyExpectedImprovement(
model=mm_noisy, X_baseline=X_baseline, sampler=sampler
)
res = acqf(X)
self.assertEqual(res.item(), 1.0)
# basic test, no resample
sampler = IIDNormalSampler(num_samples=2, seed=12345)
acqf = qNoisyExpectedImprovement(
model=mm_noisy, X_baseline=X_baseline, sampler=sampler
)
res = acqf(X)
self.assertEqual(res.item(), 1.0)
self.assertEqual(acqf.sampler.base_samples.shape, torch.Size([2, 1, 2, 1]))
bs = acqf.sampler.base_samples.clone()
acqf(X)
self.assertTrue(torch.equal(acqf.sampler.base_samples, bs))
# basic test, qmc, no resample
sampler = SobolQMCNormalSampler(num_samples=2)
acqf = qNoisyExpectedImprovement(
model=mm_noisy, X_baseline=X_baseline, sampler=sampler
)
res = acqf(X)
self.assertEqual(samples.shape, torch.Size([4, 2, 1]))
self.assertEqual(sampler.seed, 1235)
# ensure samples are the same
samples2 = sampler(posterior)
self.assertTrue(torch.allclose(samples, samples2))
self.assertEqual(sampler.seed, 1235)
# ensure this works with a differently shaped posterior
posterior_batched = _get_test_posterior_batched(
device=self.device, dtype=dtype
)
samples_batched = sampler(posterior_batched)
self.assertEqual(samples_batched.shape, torch.Size([4, 3, 2, 1]))
self.assertEqual(sampler.seed, 1236)
# resample
sampler = IIDNormalSampler(
num_samples=4, resample=True, collapse_batch_dims=False
)
self.assertTrue(sampler.resample)
self.assertFalse(sampler.collapse_batch_dims)
initial_seed = sampler.seed
# check samples non-batched
posterior = _get_test_posterior(device=self.device, dtype=dtype)
samples = sampler(posterior=posterior)
self.assertEqual(samples.shape, torch.Size([4, 2, 1]))
self.assertEqual(sampler.seed, initial_seed + 1)
# ensure samples are not the same
samples2 = sampler(posterior)
self.assertFalse(torch.allclose(samples, samples2))
self.assertEqual(sampler.seed, initial_seed + 2)
# ensure this works with a differently shaped posterior
posterior_batched = _get_test_posterior_batched(
def test_q_noisy_expected_improvement(self):
for dtype in (torch.float, torch.double):
# the event shape is `b x q x t` = 1 x 2 x 1
samples_noisy = torch.tensor([1.0, 0.0], device=self.device, dtype=dtype)
samples_noisy = samples_noisy.view(1, 2, 1)
# X_baseline is `q' x d` = 1 x 1
X_baseline = torch.zeros(1, 1, device=self.device, dtype=dtype)
mm_noisy = MockModel(MockPosterior(samples=samples_noisy))
# X is `q x d` = 1 x 1
X = torch.zeros(1, 1, device=self.device, dtype=dtype)
# basic test
sampler = IIDNormalSampler(num_samples=2)
acqf = qNoisyExpectedImprovement(
model=mm_noisy, X_baseline=X_baseline, sampler=sampler
)
res = acqf(X)
self.assertEqual(res.item(), 1.0)
# basic test, no resample
sampler = IIDNormalSampler(num_samples=2, seed=12345)
acqf = qNoisyExpectedImprovement(
model=mm_noisy, X_baseline=X_baseline, sampler=sampler
)
res = acqf(X)
self.assertEqual(res.item(), 1.0)
self.assertEqual(acqf.sampler.base_samples.shape, torch.Size([2, 1, 2, 1]))
bs = acqf.sampler.base_samples.clone()
acqf(X)
def test_get_value_function(self):
with mock.patch(NO, new_callable=mock.PropertyMock) as mock_num_outputs:
mock_num_outputs.return_value = 1
mm = MockModel(None)
# test PosteriorMean
vf = _get_value_function(mm)
self.assertIsInstance(vf, PosteriorMean)
self.assertIsNone(vf.objective)
# test SimpleRegret
obj = GenericMCObjective(lambda Y: Y.sum(dim=-1))
sampler = IIDNormalSampler(num_samples=2)
vf = _get_value_function(model=mm, objective=obj, sampler=sampler)
self.assertIsInstance(vf, qSimpleRegret)
self.assertEqual(vf.objective, obj)
self.assertEqual(vf.sampler, sampler)
def test_q_simple_regret_batch(self):
# the event shape is `b x q x t` = 2 x 2 x 1
for dtype in (torch.float, torch.double):
samples = torch.zeros(2, 2, 1, device=self.device, dtype=dtype)
samples[0, 0, 0] = 1.0
mm = MockModel(MockPosterior(samples=samples))
# X is a dummy and unused b/c of mocking
X = torch.zeros(1, 1, 1, device=self.device, dtype=dtype)
# test batch mode
sampler = IIDNormalSampler(num_samples=2)
acqf = qSimpleRegret(model=mm, sampler=sampler)
res = acqf(X)
self.assertEqual(res[0].item(), 1.0)
self.assertEqual(res[1].item(), 0.0)
# test batch mode, no resample
sampler = IIDNormalSampler(num_samples=2, seed=12345)
acqf = qSimpleRegret(model=mm, sampler=sampler)
res = acqf(X) # 1-dim batch
self.assertEqual(res[0].item(), 1.0)
self.assertEqual(res[1].item(), 0.0)
self.assertEqual(acqf.sampler.base_samples.shape, torch.Size([2, 1, 2, 1]))
bs = acqf.sampler.base_samples.clone()
acqf(X)
self.assertTrue(torch.equal(acqf.sampler.base_samples, bs))
res = acqf(X.expand(2, 1, 1)) # 2-dim batch
# test batch mode
sampler = IIDNormalSampler(num_samples=2)
acqf = qExpectedImprovement(model=mm, best_f=0, sampler=sampler)
res = acqf(X)
self.assertEqual(res[0].item(), 1.0)
self.assertEqual(res[1].item(), 0.0)
# test shifting best_f value
acqf = qExpectedImprovement(model=mm, best_f=-1, sampler=sampler)
res = acqf(X)
self.assertEqual(res[0].item(), 2.0)
self.assertEqual(res[1].item(), 1.0)
# test batch mode, no resample
sampler = IIDNormalSampler(num_samples=2, seed=12345)
acqf = qExpectedImprovement(model=mm, best_f=0, sampler=sampler)
res = acqf(X) # 1-dim batch
self.assertEqual(res[0].item(), 1.0)
self.assertEqual(res[1].item(), 0.0)
self.assertEqual(acqf.sampler.base_samples.shape, torch.Size([2, 1, 2, 1]))
bs = acqf.sampler.base_samples.clone()
acqf(X)
self.assertTrue(torch.equal(acqf.sampler.base_samples, bs))
res = acqf(X.expand(2, 1, 1)) # 2-dim batch
self.assertEqual(res[0].item(), 1.0)
self.assertEqual(res[1].item(), 0.0)
# the base samples should have the batch dim collapsed
self.assertEqual(acqf.sampler.base_samples.shape, torch.Size([2, 1, 2, 1]))
bs = acqf.sampler.base_samples.clone()
acqf(X.expand(2, 1, 1))
self.assertTrue(torch.equal(acqf.sampler.base_samples, bs))
def test_q_noisy_expected_improvement_batch(self):
for dtype in (torch.float, torch.double):
# the event shape is `b x q x t` = 2 x 3 x 1
samples_noisy = torch.zeros(2, 3, 1, device=self.device, dtype=dtype)
samples_noisy[0, 0, 0] = 1.0
mm_noisy = MockModel(MockPosterior(samples=samples_noisy))
# X is `q x d` = 1 x 1
X = torch.zeros(1, 1, 1, device=self.device, dtype=dtype)
X_baseline = torch.zeros(1, 1, device=self.device, dtype=dtype)
# test batch mode
sampler = IIDNormalSampler(num_samples=2)
acqf = qNoisyExpectedImprovement(
model=mm_noisy, X_baseline=X_baseline, sampler=sampler
)
res = acqf(X)
self.assertEqual(res[0].item(), 1.0)
self.assertEqual(res[1].item(), 0.0)
# test batch mode, no resample
sampler = IIDNormalSampler(num_samples=2, seed=12345)
acqf = qNoisyExpectedImprovement(
model=mm_noisy, X_baseline=X_baseline, sampler=sampler
)
res = acqf(X) # 1-dim batch
self.assertEqual(res[0].item(), 1.0)
self.assertEqual(res[1].item(), 0.0)
self.assertEqual(acqf.sampler.base_samples.shape, torch.Size([2, 1, 3, 1]))
res = acqf(X)
self.assertEqual(res.item(), 0.0)
# test shifting best_f value
acqf = qExpectedImprovement(model=mm, best_f=-1, sampler=sampler)
res = acqf(X)
self.assertEqual(res.item(), 1.0)
# test size verification of best_f
with self.assertRaises(ValueError):
qExpectedImprovement(
model=mm, best_f=torch.zeros(2, device=self.device, dtype=dtype)
)
# basic test, no resample
sampler = IIDNormalSampler(num_samples=2, seed=12345)
acqf = qExpectedImprovement(model=mm, best_f=0, sampler=sampler)
res = acqf(X)
self.assertEqual(res.item(), 0.0)
self.assertEqual(acqf.sampler.base_samples.shape, torch.Size([2, 1, 1, 1]))
bs = acqf.sampler.base_samples.clone()
res = acqf(X)
self.assertTrue(torch.equal(acqf.sampler.base_samples, bs))
# basic test, qmc, no resample
sampler = SobolQMCNormalSampler(num_samples=2)
acqf = qExpectedImprovement(model=mm, best_f=0, sampler=sampler)
res = acqf(X)
self.assertEqual(res.item(), 0.0)
self.assertEqual(acqf.sampler.base_samples.shape, torch.Size([2, 1, 1, 1]))
bs = acqf.sampler.base_samples.clone()
acqf(X)
seed: If provided, perform deterministic optimization (i.e. the
function to optimize is fixed and not stochastic).
Returns:
The requested acquisition function.
Example:
>>> model = SingleTaskGP(train_X, train_Y)
>>> obj = LinearMCObjective(weights=torch.tensor([1.0, 2.0]))
>>> acqf = get_acquisition_function("qEI", model, obj, train_X)
"""
# initialize the sampler
if qmc:
sampler = SobolQMCNormalSampler(num_samples=mc_samples, seed=seed)
else:
sampler = IIDNormalSampler(num_samples=mc_samples, seed=seed)
# instantiate and return the requested acquisition function
if acquisition_function_name == "qEI":
best_f = objective(model.posterior(X_observed).mean).max().item()
return monte_carlo.qExpectedImprovement(
model=model,
best_f=best_f,
sampler=sampler,
objective=objective,
X_pending=X_pending,
)
elif acquisition_function_name == "qPI":
best_f = objective(model.posterior(X_observed).mean).max().item()
return monte_carlo.qProbabilityOfImprovement(
model=model,
best_f=best_f,
sampler=sampler,