Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
if not num_optims: # if no num_vars and no num_optims, just assume 2.
num_optims = 2
# num_vars not given: we will distribute variables equally.
if num_optims > self.dimension:
num_optims = self.dimension
self.num_optims = num_optims
self.optims: List[Any] = []
self.num_vars: List[Any] = num_vars if num_vars else []
self.instrumentations: List[Any] = []
for i in range(self.num_optims):
if not self.num_vars or len(self.num_vars) < i+1:
self.num_vars += [(self.dimension // self.num_optims) + (self.dimension % self.num_optims > i)]
assert self.num_vars[i] >= 1, "At least one variable per optimizer."
self.instrumentations += [Instrumentation(inst.variables.Array(self.num_vars[i]).affined(1, 0))]
assert len(self.optims) == i
if self.num_vars[i] > 1:
self.optims += [multivariate_optimizer(self.instrumentations[i], budget, num_workers)] # noqa: F405
else:
self.optims += [monovariate_optimizer(self.instrumentations[i], budget, num_workers)] # noqa: F405
assert sum(self.num_vars) == self.dimension, f"sum(num_vars)={sum(self.num_vars)} should be equal to the dimension {self.dimension}."
def test_instrumentation_optimizer_reproducibility() -> None:
instrumentation = inst.Instrumentation(inst.var.Array(1), y=inst.var.SoftmaxCategorical(list(range(100))))
instrumentation.random_state.seed(12)
optimizer = optlib.RandomSearch(instrumentation, budget=10)
recom = optimizer.minimize(_square)
np.testing.assert_equal(recom.kwargs["y"], 67)
def __init__(self, instrumentation: Union[instru.Instrumentation, int], budget: Optional[int] = None, num_workers: int = 1) -> None:
if self.no_parallelization and num_workers > 1:
raise ValueError(f"{self.__class__.__name__} does not support parallelization")
# "seedable" random state: externally setting the seed will provide deterministic behavior
# you can also replace or reinitialize this random state
self.num_workers = int(num_workers)
self.budget = budget
# How do we deal with cheap constraints i.e. constraints which are fast and use low resources and easy ?
# True ==> we penalize them (infinite values for candidates which violate the constraint).
# False ==> we repeat the ask until we solve the problem.
self._penalize_cheap_violations = False
self.instrumentation = (
instrumentation
if isinstance(instrumentation, instru.Instrumentation)
else instru.Instrumentation(instru.var.Array(instrumentation))
)
if not self.dimension:
raise ValueError("No variable to optimize in this instrumentation.")
self.create_candidate = CandidateMaker(self.instrumentation)
self.name = self.__class__.__name__ # printed name in repr
# keep a record of evaluations, and current bests which are updated at each new evaluation
self.archive: utils.Archive[utils.Value] = utils.Archive() # dict like structure taking np.ndarray as keys and Value as values
self.current_bests = {
x: utils.Point(np.zeros(self.dimension, dtype=np.float), utils.Value(np.inf)) for x in ["optimistic", "pessimistic", "average"]
}
# pruning function, called at each "tell"
# this can be desactivated or modified by each implementation
self.pruning: Optional[Callable[[utils.Archive[utils.Value]], utils.Archive[utils.Value]]] = utils.Pruning.sensible_default(
num_workers=num_workers, dimension=self.instrumentation.dimension
)
arrays.extend([inst.var.Array(n).bounded(2, 3, transform=transform) for _ in range(2)])
arrays.extend([inst.var.Array(n).bounded(0, 300, transform=transform) for _ in range(2)])
elif name == "chirped":
# n multiple of 2, from 10 to 80
# domain (n=60): [0,300]^60
arrays = [inst.var.Array(n).bounded(0, 300, transform=transform) for _ in range(4)]
elif name == "morpho":
# n multiple of 4, from 16 to 60
# domain (n=60): [0,300]^15 x [0,600]^15 x [30,600]^15 x [0,300]^15
arrays.extend([inst.var.Array(n).bounded(0, 300, transform=transform),
inst.var.Array(n).bounded(0, 600, transform=transform),
inst.var.Array(n).bounded(30, 600, transform=transform),
inst.var.Array(n).bounded(0, 300, transform=transform)])
else:
raise NotImplementedError(f"Transform for {name} is not implemented")
instrumentation = inst.Instrumentation(*arrays)
assert instrumentation.dimension == dimension
return instrumentation
def __call__(self, instrumentation: Union[int, Instrumentation], budget: Optional[int] = None, num_workers: int = 1) -> base.Optimizer:
gp_params = {} if self.gp_parameters is None else self.gp_parameters
if isinstance(instrumentation, Instrumentation) and gp_params.get("alpha", 0) == 0:
noisy = instrumentation.noisy
cont = instrumentation.continuous
if noisy or not cont:
warnings.warn(
"Dis-continuous and noisy instrumentation require gp_parameters['alpha'] > 0 "
"(for your instrumentation, continuity={cont} and noisy={noisy}).\n"
"Find more information on BayesianOptimization's github.\n"
"You should then create a new instance of optimizerlib.ParametrizedBO with appropriate parametrization.",
InefficientSettingsWarning,
)
return super().__call__(instrumentation, budget, num_workers)
choices = range(int(param_range[0]), int(param_range[-1]) + 1)
arg = inst.var.OrderedDiscrete(choices)
# We are throwing away information here, but OrderedDiscrete
# appears to be invariant to monotonic transformation anyway.
elif param_type == "real":
assert param_values is None
assert param_range is not None
# Will need to warp to this space sep.
arg = inst.var.Gaussian(mean=0, std=1)
prewarp = Real(warp=param_space, range_=param_range)
else:
assert False, "type %s not handled in API" % param_type
all_args[param_name] = arg
all_prewarp[param_name] = prewarp
instrum = inst.Instrumentation(**all_args)
return instrum, all_prewarp
self.errors = []
for param in params:
if param not in parameter_names:
raise ValueError("Parameter %s must be defined as a parameter "
"in the model" % param)
bounds = calc_bounds(parameter_names, **params)
instruments = []
for i, name in enumerate(parameter_names):
assert len(bounds[i]) == 2
instrumentation = inst.var.Array(1).asscalar().bounded(np.array([bounds[i][0]]),
np.array([bounds[i][1]]))
instruments.append(instrumentation)
instrum = inst.Instrumentation(*instruments)
self.optim = optimizerlib.registry[self.method](instrumentation=instrum,
**self.kwds)
self.optim._llambda = popsize # TODO: more elegant way once possible
choices = range(int(param_range[0]), int(param_range[-1]) + 1)
arg = inst.var.OrderedDiscrete(choices)
# We are throwing away information here, but OrderedDiscrete
# appears to be invariant to monotonic transformation anyway.
elif param_type == "real":
assert param_values is None
assert param_range is not None
# Will need to warp to this space sep.
arg = inst.var.Gaussian(mean=0, std=1)
prewarp = Real(warp=param_space, range_=param_range)
else:
assert False, "type %s not handled in API" % param_type
all_args[param_name] = arg
all_prewarp[param_name] = prewarp
instrum = inst.Instrumentation(**all_args)
return instrum, all_prewarp