Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from . import base
from . import callbacks
class CounterFunction:
def __init__(self) -> None:
self.count = 0
def __call__(self, value: base.ArrayLike) -> float:
assert len(value) == 1
self.count += 1
return float(value[0] - 1)**2
class LoggingOptimizer(base.Optimizer):
def __init__(self, num_workers: int = 1) -> None:
super().__init__(instrumentation=1, budget=5, num_workers=num_workers)
self.logs: List[str] = []
def _internal_ask(self) -> base.ArrayLike:
self.logs.append(f"s{self._num_ask}") # s for suggest
return np.array((float(self._num_ask),))
def _internal_tell(self, x: base.ArrayLike, value: float) -> None:
self.logs.append(f"u{int(x[0])}") # u for update
@testing.parametrized(
w1_batch=(1, True, ['s0', 'u0', 's1', 'u1', 's2', 'u2', 's3', 'u3', 's4', 'u4']),
w1_steady=(1, False, ['s0', 'u0', 's1', 'u1', 's2', 'u2', 's3', 'u3', 's4', 'u4']), # no difference (normal, since worker=1)
x = candidate.data
sigma = np.linalg.norm(x - self.current_center) / np.sqrt(self.dimension) # educated guess
part = base.utils.Individual(x)
part._parameters = np.array([sigma])
self._unevaluated_population[x.tobytes()] = part
self._internal_tell_candidate(candidate, value) # go through standard pipeline
@registry.register
class NaiveTBPSA(TBPSA):
def _internal_provide_recommendation(self) -> ArrayLike:
return self.current_bests["optimistic"].x
@registry.register
class NoisyBandit(base.Optimizer):
"""UCB.
This is upper confidence bound (adapted to minimization),
with very poor parametrization; in particular, the logarithmic term is set to zero.
Infinite arms: we add one arm when `20 * #ask >= #arms ** 3`.
"""
def _internal_ask(self) -> ArrayLike:
if 20 * self._num_ask >= len(self.archive) ** 3:
return self._rng.normal(0, 1, self.dimension) # type: ignore
if self._rng.choice([True, False]):
# numpy does not accept choice on list of tuples, must choose index instead
idx = self._rng.choice(len(self.archive))
return np.frombuffer(list(self.archive.bytesdict.keys())[idx]) # type: ignore
return self.current_bests["optimistic"].x
self.delta = 2 * self._rng.randint(2, size=self.dimension) - 1
return self.t - self._ck(k) * self.delta # type:ignore
return self.t + self._ck(k) * self.delta # type: ignore
def _internal_tell(self, x: ArrayLike, value: float) -> None:
setattr(self, ("ym" if self.idx % 2 == 0 else "yp"), np.array(value, copy=True))
self.idx += 1
if self.init and self.yp is not None and self.ym is not None:
self.init = False
def _internal_provide_recommendation(self) -> ArrayLike:
return self.avg
@registry.register
class SplitOptimizer(base.Optimizer):
"""Combines optimizers, each of them working on their own variables.
num_optims: number of optimizers
num_vars: number of variable per optimizer.
E.g. for 5 optimizers, each of them working on 2 variables, we can use:
opt = SplitOptimizer(instrumentation=10, num_workers=3, num_optims=5, num_vars=[2, 2, 2, 2, 2])
or equivalently:
opt = SplitOptimizer(instrumentation=10, num_workers=3, num_vars=[2, 2, 2, 2, 2])
Given that all optimizers have the same number of variables, we can also do:
opt = SplitOptimizer(instrumentation=10, num_workers=3, num_optims=5)
This is 5 parallel (by num_workers = 5).
Be careful! The variables refer to the deep representation used by optimizers.
For example, a categorical variable with 5 possible values becomes 5 continuous variables.
def __init__(self, function: instru.InstrumentedFunction,
optimizer: Union[str, base.OptimizerFamily], budget: int, num_workers: int = 1,
batch_mode: bool = True, seed: Optional[int] = None,
cheap_constraint_checker: Optional[Callable[[Any], Any]] = None,
) -> None:
assert isinstance(function, instru.InstrumentedFunction), ("All experiment functions should derive from InstrumentedFunction")
self.function = function
self.seed = seed # depending on the inner workings of the function, the experiment may not be repeatable
self.optimsettings = OptimizerSettings(optimizer=optimizer, num_workers=num_workers, budget=budget, batch_mode=batch_mode)
self.result = {"loss": np.nan, "elapsed_budget": np.nan, "elapsed_time": np.nan, "error": ""}
self.recommendation: Optional[base.Candidate] = None
self._optimizer: Optional[base.Optimizer] = None # to be able to restore stopped/checkpointed optimizer
self._cheap_constraint_checker = cheap_constraint_checker
self.evaluated_population = [p[0] for p in sorted_pop_with_sigma_and_fitness]
self.covariance *= 0.9
self.covariance += 0.1 * np.cov(np.array(self.evaluated_population).T)
self.evaluated_population_sigma = [p[1] for p in sorted_pop_with_sigma_and_fitness]
self.evaluated_population_fitness = [p[2] for p in sorted_pop_with_sigma_and_fitness]
# Computing the new parent.
arrays = [np.asarray(self.evaluated_population[i]) for i in range(self.mu)]
self.current_center = sum(arrays) / self.mu # type: ignore
self.sigma = np.exp(sum([np.log(self.evaluated_population_sigma[i]) for i in range(self.mu)]) / self.mu)
self.evaluated_population = []
self.evaluated_population_sigma = []
self.evaluated_population_fitness = []
@registry.register
class TBPSA(base.Optimizer):
"""Test-based population-size adaptation.
Population-size equal to lambda = 4 x dimension.
Test by comparing the first fifth and the last fifth of the 5lambda evaluations.
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, instrumentation: Union[int, Instrumentation], budget: Optional[int] = None, num_workers: int = 1) -> None:
super().__init__(instrumentation, budget=budget, num_workers=num_workers)
self.sigma = 1
self.mu = self.dimension
self.llambda = 4 * self.dimension
if num_workers is not None:
self.llambda = max(self.llambda, num_workers)
self.current_center: np.ndarray = np.zeros(self.dimension)
_optimizer_class = _CMA
def __init__(self, *, scale: float = 1.0, diagonal: bool = False) -> None:
self.scale = scale
self.diagonal = diagonal
super().__init__()
CMA = ParametrizedCMA().with_name("CMA", register=True)
DiagonalCMA = ParametrizedCMA(diagonal=True).with_name("DiagonalCMA", register=True)
MilliCMA = ParametrizedCMA(scale=1e-3).with_name("MilliCMA", register=True)
MicroCMA = ParametrizedCMA(scale=1e-6).with_name("MicroCMA", register=True)
@registry.register
class EDA(base.Optimizer):
"""Test-based population-size adaptation.
Population-size equal to lambda = 4 x dimension.
Test by comparing the first fifth and the last fifth of the 5lambda evaluations.
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, instrumentation: Union[int, Instrumentation], budget: Optional[int] = None, num_workers: int = 1) -> None:
super().__init__(instrumentation, budget=budget, num_workers=num_workers)
self.sigma = 1
self.covariance = np.identity(self.dimension)
self.mu = self.dimension
self.llambda = 4 * self.dimension
if num_workers is not None:
self.llambda = max(self.llambda, num_workers)
"Find more information on BayesianOptimization's github.\n"
"You should then create a new instance of optimizerlib.ParametrizedBO with appropriate parametrization.",
InefficientSettingsWarning,
)
return super().__call__(instrumentation, budget, num_workers)
BO = ParametrizedBO().with_name("BO", register=True)
RBO = ParametrizedBO(initialization="random").with_name("RBO", register=True)
QRBO = ParametrizedBO(initialization="Hammersley").with_name("QRBO", register=True)
MidQRBO = ParametrizedBO(initialization="Hammersley", middle_point=True).with_name("MidQRBO", register=True)
LBO = ParametrizedBO(initialization="LHS").with_name("LBO", register=True)
@registry.register
class PBIL(base.Optimizer):
"""
Implementation of the discrete algorithm PBIL
https://www.ri.cmu.edu/pub_files/pub1/baluja_shumeet_1994_2/baluja_shumeet_1994_2.pdf
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, instrumentation: Union[int, Instrumentation], budget: Optional[int] = None, num_workers: int = 1) -> None:
super().__init__(instrumentation, budget=budget, num_workers=num_workers)
self._penalize_cheap_violations = False # Not sure this is the optimal decision.
num_categories = 2
self.p: np.ndarray = np.ones((1, self.dimension)) / num_categories
self.alpha = 0.3
self.llambda = max(100, num_workers) # size of the population
def load(self, filepath: Union[str, Path]) -> "Optimizer":
"""Loads a pickle and checks that it is an Optimizer.
"""
return load(Optimizer, filepath)
def _internal_ask_candidate(self) -> base.Candidate:
unif = self._rng.uniform(size=self.dimension)
data = (unif > 1 - self.p[0]).astype(float)
return self.create_candidate.from_data(data)
def _internal_tell_candidate(self, candidate: base.Candidate, value: float) -> None:
self._population.append((value, candidate.data))
if len(self._population) >= self.llambda:
self._population.sort(key=lambda tup: tup[0])
mean_pop: np.ndarray = np.mean([x[1] for x in self._population[: self.mu]])
self.p[0] = (1 - self.alpha) * self.p[0] + self.alpha * mean_pop
self._population = []
class _Chain(base.Optimizer):
def __init__(self, instrumentation: Union[int, Instrumentation], budget: Optional[int] = None, num_workers: int = 1) -> None:
super().__init__(instrumentation, budget=budget, num_workers=num_workers)
self._parameters = Chaining([LHSSearch, DE], [10]) # needs a default
# delayed initialization
self._optimizers_: List[base.Optimizer] = []
@property
def _optimizers(self) -> List[base.Optimizer]:
if not self._optimizers_:
self._optimizers_ = []
converter = {"num_workers": self.num_workers, "dimension": self.dimension, "sqrt": int(np.sqrt(self.budget)) if self.budget else self.num_workers}
budgets = [converter[b] if isinstance(b, str) else b for b in self._parameters.budgets]
last_budget = None if self.budget is None else self.budget - sum(budgets)
for opt, budget in zip(self._parameters.optimizers, budgets + [last_budget]): # type: ignore
self._optimizers_.append(opt(self.instrumentation, budget=budget, num_workers=self.num_workers))
PortfolioOptimisticNoisyDiscreteOnePlusOne = ParametrizedOnePlusOne(noise_handling="optimistic", mutation="portfolio").with_name(
"PortfolioOptimisticNoisyDiscreteOnePlusOne", register=True
)
PortfolioNoisyDiscreteOnePlusOne = ParametrizedOnePlusOne(noise_handling="random", mutation="portfolio").with_name(
"PortfolioNoisyDiscreteOnePlusOne", register=True
)
CauchyOnePlusOne = ParametrizedOnePlusOne(mutation="cauchy").with_name("CauchyOnePlusOne", register=True)
RecombiningOptimisticNoisyDiscreteOnePlusOne = ParametrizedOnePlusOne(
crossover=True, mutation="discrete", noise_handling="optimistic"
).with_name("RecombiningOptimisticNoisyDiscreteOnePlusOne", register=True)
RecombiningPortfolioOptimisticNoisyDiscreteOnePlusOne = ParametrizedOnePlusOne(
crossover=True, mutation="portfolio", noise_handling="optimistic"
).with_name("RecombiningPortfolioOptimisticNoisyDiscreteOnePlusOne", register=True)
class _CMA(base.Optimizer):
def __init__(self, instrumentation: Union[int, Instrumentation], budget: Optional[int] = None, num_workers: int = 1) -> None:
super().__init__(instrumentation, budget=budget, num_workers=num_workers)
self._parameters = ParametrizedCMA()
self._es: Optional[cma.CMAEvolutionStrategy] = None
# delay initialization to ease implementation of variants
self.listx: List[ArrayLike] = []
self.listy: List[float] = []
self.to_be_asked: Deque[np.ndarray] = deque()
@property
def es(self) -> cma.CMAEvolutionStrategy:
if self._es is None:
popsize = max(self.num_workers, 4 + int(3 * np.log(self.dimension)))
diag = self._parameters.diagonal
inopts = {"popsize": popsize, "randn": self._rng.randn, "CMA_diagonal": diag, "verbose": 0}
self._es = cma.CMAEvolutionStrategy(x0=np.zeros(self.dimension, dtype=np.float), sigma0=self._parameters.scale, inopts=inopts)