Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for dtype in (torch.float, torch.double):
# basic test
train_X = torch.rand(10, 2, device=self.device, dtype=dtype)
train_Y1 = train_X.sum(dim=-1, keepdim=True)
train_Y2 = (train_X[:, 0] - train_X[:, 1]).unsqueeze(-1)
gp1 = SingleTaskGP(train_X, train_Y1)
gp2 = SingleTaskGP(train_X, train_Y2)
list_gp = ModelListGP(gp1, gp2)
batch_gp = model_list_to_batched(list_gp)
self.assertIsInstance(batch_gp, SingleTaskGP)
# test degenerate (single model)
batch_gp = model_list_to_batched(ModelListGP(gp1))
self.assertEqual(batch_gp._num_outputs, 1)
# test different model classes
gp2 = FixedNoiseGP(train_X, train_Y1, torch.ones_like(train_Y1))
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# test non-batched models
gp1_ = SimpleGPyTorchModel(train_X, train_Y1)
gp2_ = SimpleGPyTorchModel(train_X, train_Y2)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1_, gp2_))
# test list of multi-output models
train_Y = torch.cat([train_Y1, train_Y2], dim=-1)
gp2 = SingleTaskGP(train_X, train_Y)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# test different training inputs
gp2 = SingleTaskGP(2 * train_X, train_Y2)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# check scalar agreement
def test_raise_init_errors(self):
with self.assertRaises(UnsupportedError):
LinearTruncatedFidelityKernel(fidelity_dims=[2])
with self.assertRaises(UnsupportedError):
LinearTruncatedFidelityKernel(fidelity_dims=[0, 1, 2], dimension=3)
with self.assertRaises(ValueError):
LinearTruncatedFidelityKernel(fidelity_dims=[2, 2], dimension=3)
with self.assertRaises(ValueError):
LinearTruncatedFidelityKernel(fidelity_dims=[2], dimension=2, nu=1)
gp1 = SingleTaskGP(train_X, train_Y1)
gp2 = SingleTaskGP(train_X, train_Y2)
list_gp = ModelListGP(gp1, gp2)
batch_gp = model_list_to_batched(list_gp)
self.assertIsInstance(batch_gp, SingleTaskGP)
# test degenerate (single model)
batch_gp = model_list_to_batched(ModelListGP(gp1))
self.assertEqual(batch_gp._num_outputs, 1)
# test different model classes
gp2 = FixedNoiseGP(train_X, train_Y1, torch.ones_like(train_Y1))
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# test non-batched models
gp1_ = SimpleGPyTorchModel(train_X, train_Y1)
gp2_ = SimpleGPyTorchModel(train_X, train_Y2)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1_, gp2_))
# test list of multi-output models
train_Y = torch.cat([train_Y1, train_Y2], dim=-1)
gp2 = SingleTaskGP(train_X, train_Y)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# test different training inputs
gp2 = SingleTaskGP(2 * train_X, train_Y2)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# check scalar agreement
gp2 = SingleTaskGP(train_X, train_Y2)
gp2.likelihood.noise_covar.noise_prior.rate.fill_(1.0)
with self.assertRaises(UnsupportedError):
model_list_to_batched(ModelListGP(gp1, gp2))
# check tensor shape agreement
def test_raise_init_errors(self):
with self.assertRaises(UnsupportedError):
LinearTruncatedFidelityKernel(fidelity_dims=[2])
with self.assertRaises(UnsupportedError):
LinearTruncatedFidelityKernel(fidelity_dims=[0, 1, 2], dimension=3)
with self.assertRaises(ValueError):
LinearTruncatedFidelityKernel(fidelity_dims=[2, 2], dimension=3)
with self.assertRaises(ValueError):
LinearTruncatedFidelityKernel(fidelity_dims=[2], dimension=2, nu=1)
def _check_compatibility(models: ModelListGP) -> None:
"""Check if a ModelListGP can be converted."""
# check that all submodules are of the same type
for modn, mod in models[0].named_modules():
mcls = mod.__class__
if not all(isinstance(_get_module(m, modn), mcls) for m in models[1:]):
raise UnsupportedError(
"Sub-modules must be of the same type across models."
)
# check that each model is a BatchedMultiOutputGPyTorchModel
if not all(isinstance(m, BatchedMultiOutputGPyTorchModel) for m in models):
raise UnsupportedError(
"All models must be of type BatchedMultiOutputGPyTorchModel."
)
# TODO: Add support for HeteroskedasticSingleTaskGP
if any(isinstance(m, HeteroskedasticSingleTaskGP) for m in models):
raise NotImplementedError(
"Conversion of HeteroskedasticSingleTaskGP is currently unsupported."
)
# TODO: Add support for custom likelihoods
if any(getattr(m, "_is_custom_likelihood", False) for m in models):
raise NotImplementedError(
"Conversion of models with custom likelihoods is currently unsupported."
)
# check that each model is single-output
self, model: Model, objective: Optional[ScalarizedObjective] = None
) -> None:
r"""Base constructor for analytic acquisition functions.
Args:
model: A fitted single-outcome model.
objective: A ScalarizedObjective (optional).
"""
super().__init__(model=model)
if objective is None:
if model.num_outputs != 1:
raise UnsupportedError(
"Must specify an objective when using a multi-output model."
)
elif not isinstance(objective, ScalarizedObjective):
raise UnsupportedError(
"Only objectives of type ScalarizedObjective are supported for "
"analytic acquisition functions."
)
self.objective = objective
lengthscale_prior_biased: Optional[Prior] = None,
lengthscale_constraint_unbiased: Optional[Interval] = None,
lengthscale_constraint_biased: Optional[Interval] = None,
covar_module_unbiased: Optional[Kernel] = None,
covar_module_biased: Optional[Kernel] = None,
**kwargs: Any,
) -> None:
if dimension is None and kwargs.get("active_dims") is None:
raise UnsupportedError(
"Must specify dimension when not specifying active_dims."
)
n_fidelity = len(fidelity_dims)
if len(set(fidelity_dims)) != n_fidelity:
raise ValueError("fidelity_dims must not have repeated elements")
if n_fidelity not in {1, 2}:
raise UnsupportedError(
"LinearTruncatedFidelityKernel accepts either one or two"
"fidelity parameters."
)
if nu not in {0.5, 1.5, 2.5}:
raise ValueError("nu must be one of 0.5, 1.5, or 2.5")
super().__init__(**kwargs)
self.fidelity_dims = fidelity_dims
if power_constraint is None:
power_constraint = Positive()
if lengthscale_prior_unbiased is None:
lengthscale_prior_unbiased = GammaPrior(3, 6)
if lengthscale_prior_biased is None:
lengthscale_prior_biased = GammaPrior(6, 2)
def _check_compatibility(models: ModelListGP) -> None:
"""Check if a ModelListGP can be converted."""
# check that all submodules are of the same type
for modn, mod in models[0].named_modules():
mcls = mod.__class__
if not all(isinstance(_get_module(m, modn), mcls) for m in models[1:]):
raise UnsupportedError(
"Sub-modules must be of the same type across models."
)
# check that each model is a BatchedMultiOutputGPyTorchModel
if not all(isinstance(m, BatchedMultiOutputGPyTorchModel) for m in models):
raise UnsupportedError(
"All models must be of type BatchedMultiOutputGPyTorchModel."
)
# TODO: Add support for HeteroskedasticSingleTaskGP
if any(isinstance(m, HeteroskedasticSingleTaskGP) for m in models):
raise NotImplementedError(
"Conversion of HeteroskedasticSingleTaskGP is currently unsupported."
)
# TODO: Add support for custom likelihoods
# TODO: Add support for HeteroskedasticSingleTaskGP
if any(isinstance(m, HeteroskedasticSingleTaskGP) for m in models):
raise NotImplementedError(
"Conversion of HeteroskedasticSingleTaskGP is currently unsupported."
)
# TODO: Add support for custom likelihoods
if any(getattr(m, "_is_custom_likelihood", False) for m in models):
raise NotImplementedError(
"Conversion of models with custom likelihoods is currently unsupported."
)
# check that each model is single-output
if not all(m._num_outputs == 1 for m in models):
raise UnsupportedError("All models must be single-output.")
# check that training inputs are the same
if not all(
torch.equal(ti, tj)
for m in models[1:]
for ti, tj in zip(models[0].train_inputs, m.train_inputs)
):
raise UnsupportedError("training inputs must agree for all sub-models.")
dimension: int = 3,
nu: float = 2.5,
train_iteration_fidelity: bool = True,
train_data_fidelity: bool = True,
lengthscale_prior: Optional[Prior] = None,
power_prior: Optional[Prior] = None,
power_constraint: Optional[Interval] = None,
lengthscale_2_prior: Optional[Prior] = None,
lengthscale_2_constraint: Optional[Interval] = None,
lengthscale_constraint: Optional[Interval] = None,
covar_module_1: Optional[Kernel] = None,
covar_module_2: Optional[Kernel] = None,
**kwargs: Any,
):
if not train_iteration_fidelity and not train_data_fidelity:
raise UnsupportedError("You should have at least one fidelity parameter.")
if nu not in {0.5, 1.5, 2.5}:
raise ValueError("nu expected to be 0.5, 1.5, or 2.5")
super().__init__(**kwargs)
self.train_iteration_fidelity = train_iteration_fidelity
self.train_data_fidelity = train_data_fidelity
if power_constraint is None:
power_constraint = Positive()
if lengthscale_prior is None:
lengthscale_prior = GammaPrior(3, 6)
if lengthscale_2_prior is None:
lengthscale_2_prior = GammaPrior(6, 2)
if lengthscale_constraint is None:
lengthscale_constraint = Positive()