Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
logging.basicConfig(level=logging.DEBUG)
self.cs = ConfigurationSpace()
self.cs.add_hyperparameter(CategoricalHyperparameter(
name="cat_a_b", choices=["a", "b"], default_value="a"))
self.cs.add_hyperparameter(UniformFloatHyperparameter(
name="float_0_1", lower=0, upper=1, default_value=0.5))
self.cs.add_hyperparameter(UniformIntegerHyperparameter(
name='integer_0_100', lower=-10, upper=10, default_value=0))
self.rh = runhistory.RunHistory(aggregate_func=average_cost)
rs = numpy.random.RandomState(1)
to_count = 0
cn_count = 0
for i in range(500):
config, seed, runtime, status, instance_id = \
generate_config(cs=self.cs, rs=rs)
if runtime == 40:
to_count += 1
if runtime < 40 and status == StatusType.TIMEOUT:
cn_count += 1
self.rh.add(config=config, cost=runtime, time=runtime,
status=status, instance_id=instance_id,
seed=seed, additional_info=None)
print("%d TIMEOUTs, %d censored" % (to_count, cn_count))
self.scen = Scen()
rng=np.random.RandomState(12345),
instances=list(range(10)),
deterministic=False)
for i in range(10):
self.rh.add(config=self.config1, cost=i+1, time=1,
status=StatusType.SUCCESS, instance_id=i,
seed=i,
additional_info=None)
# tie on first instances and then challenger should always win
# and be returned as inc
inc = intensifier._race_challenger(challenger=self.config2,
incumbent=self.config1,
run_history=self.rh,
aggregate_func=average_cost)
# self.assertTrue(False)
self.assertEqual(inc, self.config2)
self.assertEqual(
self.rh.get_cost(self.config2), 1, self.rh.get_cost(self.config2))
# get data for config2 to check that the correct run was performed
runs = self.rh.get_runs_for_config(self.config2)
self.assertEqual(len(runs), 10)
seeds = sorted([r.seed for r in runs])
self.assertEqual(seeds, list(range(10)), seeds)
other_runhistory_filename = os.path.join(self.tmp_dir,
'runhistory.json')
with open(other_runhistory_filename, 'w') as fh:
fh.write(other_runhistory)
# load from an empty runhistory
runhistory = RunHistory(aggregate_func=average_cost)
runhistory.load_json(other_runhistory_filename, configuration_space)
self.assertEqual(sorted(list(runhistory.ids_config.keys())),
[1, 2, 3, 4])
self.assertEqual(len(runhistory.data), 6)
# load from non-empty runhistory, in case of a duplicate the existing
# result will be kept and the new one silently discarded
runhistory = RunHistory(aggregate_func=average_cost)
configuration_space.seed(1)
config = configuration_space.sample_configuration()
runhistory.add(config, 1, 1, StatusType.SUCCESS, seed=1,
instance_id='branin')
id_before = id(runhistory.data[RunKey(1, 'branin', 1)])
runhistory.update_from_json(other_runhistory_filename,
configuration_space)
id_after = id(runhistory.data[RunKey(1, 'branin', 1)])
self.assertEqual(len(runhistory.data), 6)
self.assertEqual(id_before, id_after)
# load from non-empty runhistory, in case of a duplicate the existing
# result will be kept and the new one silently discarded
runhistory = RunHistory(aggregate_func=average_cost)
configuration_space.seed(1)
config = configuration_space.sample_configuration()
def setUp(self):
unittest.TestCase.setUp(self)
self.rh = runhistory.RunHistory(aggregate_func=average_cost)
self.cs = get_config_space()
self.config1 = Configuration(self.cs,
values={'a': 0, 'b': 100})
self.config2 = Configuration(self.cs,
values={'a': 100, 'b': 0})
self.config3 = Configuration(self.cs,
values={'a': 100, 'b': 100})
self.scen = Scenario({'run_obj': 'runtime', 'cutoff_time': 20,
'cs': self.cs})
self.types, self.bounds = get_types(self.cs, None)
self.scen = Scenario({'run_obj': 'runtime', 'cutoff_time': 20, 'cs': self.cs,
'output_dir': ''})
def test_choose_next(self):
seed = 42
smbo = SMAC4AC(self.scenario, rng=seed).solver
smbo.runhistory = RunHistory(aggregate_func=average_cost)
X = self.scenario.cs.sample_configuration().get_array()[None, :]
smbo.incumbent = self.scenario.cs.sample_configuration()
smbo.runhistory.add(smbo.incumbent, 10, 10, 1)
Y = self.branin(X)
x = next(smbo.choose_next(X, Y)).get_array()
assert x.shape == (2,)
instances=[None],
run_obj_time=False,
)
configs = [Configuration(configuration_space=self.cs, values={"x1":4}),
Configuration(configuration_space=self.cs, values={"x1":2})]
dc = InitialDesign(
tae_runner=self.ta,
scenario=self.scenario,
stats=stats,
traj_logger=tj,
runhistory=rh,
rng=rng,
configs=configs,
intensifier=intensifier,
aggregate_func=average_cost,
)
inc = dc.run()
self.assertTrue(stats.ta_runs==4) # two runs per config
self.assertTrue(len(rh.data)==4) # two runs per config
self.assertTrue(rh.get_cost(inc) == 4)
insts = [None]
# If algorithm is deterministic, fix repetitions to 1
if self.scen.deterministic and repetitions != 1:
self.logger.warning("Specified %d repetitions, but fixing to 1, "
"because algorithm is deterministic.", repetitions)
repetitions = 1
# Extract relevant information from given runhistory
inst_seed_config = self._process_runhistory(configs, insts, runhistory)
# Now create the actual run-list
runs = []
# Counter for runs without the need of recalculation
runs_from_rh = 0
# If we reuse runs, we want to return them as well
new_rh = RunHistory(average_cost)
for i in sorted(insts):
for rep in range(repetitions):
# First, find a seed and add all the data we can take from the
# given runhistory to "our" validation runhistory.
configs_evaluated = []
if runhistory and i in inst_seed_config:
# Choose seed based on most often evaluated inst-seed-pair
seed, configs_evaluated = inst_seed_config[i].pop(0)
# Delete inst if all seeds are used
if not inst_seed_config[i]:
inst_seed_config.pop(i)
# Add runs to runhistory
for c in configs_evaluated[:]:
runkey = RunKey(runhistory.config_ids[c], i, seed)
cost, time, status, additional_info = runhistory.data[runkey]
rh: RunHistory
reduced runhistory
"""
configs = rh.get_all_configs()
if max_configs <= 0 or max_configs > len(configs): # keep all
return rh
runs = [(c, len(rh.get_runs_for_config(c))) for c in configs]
if not keep:
keep = []
runs = sorted(runs, key=lambda x: x[1])[-self.max_plot:]
keep = [r[0] for r in runs] + keep
self.logger.info("Reducing number of configs from %d to %d, dropping from the fewest evaluations",
len(configs), len(keep))
new_rh = RunHistory(average_cost)
for k, v in list(rh.data.items()):
c = rh.ids_config[k.config_id]
if c in keep:
new_rh.add(config=rh.ids_config[k.config_id],
cost=v.cost, time=v.time, status=v.status,
instance_id=k.instance_id, seed=k.seed)
return new_rh
runhistory=smac.runhistory,
incumbent=smac.solver.incumbent,
seed=seed,
parameters_to_evaluate=numParams,
save_folder='PIMP',
impute_censored=impute,
max_sample_size=max_sample_size,
fANOVA_cut_at_default=fanova_cut_at_default,
fANOVA_pairwise=fANOVA_pairwise,
forwardsel_feat_imp=forwardsel_feat_imp,
incn_quant_var=incn_quant_var,
preprocess=marginalize_away_instances)
elif X is not None and y is not None:
X = np.array(X)
y = np.array(y)
runHist = RunHistory(average_cost)
if X.shape[0] != y.shape[0]:
raise Exception('Number of samples in X and y dont match!')
n_params = len(scenario.cs.get_hyperparameters())
feats = None
if X.shape[1] > n_params:
feats = X[:, n_params:]
assert feats.shape[1] == scenario.feature_array.shape[1]
X = X[:, :n_params]
for p in range(X.shape[1]): # Normalize the data to fit into [0, 1]
_min, _max = np.min(X[:, p]), np.max(X[:, p])
if _min < 0. or 1 < _max: # if it is not already normalized
for id, v in enumerate(X[:, p]):
X[id, p] = (v - _min) / (_max - _min)
# Add everything to a runhistory such that PIMP can work with it
elif ta_exec_dir is None:
ta_exec_dir = '.'
self.scen_fn = os.path.join(folder, 'scenario.txt')
self.rh_fn = os.path.join(folder, 'runhistory.json')
self.traj_fn = os.path.join(folder, 'traj_aclib2.json')
self.traj_old_fn = os.path.join(folder, 'traj_old.csv')
# Create Scenario (disable output_dir to avoid cluttering)
scen_dict = in_reader.read_scenario_file(self.scen_fn)
scen_dict['output_dir'] = ""
with changedir(ta_exec_dir):
self.scen = Scenario(scen_dict)
# Load runhistory and trajectory
self.runhistory = RunHistory(average_cost)
self.runhistory.update_from_json(self.rh_fn, self.scen.cs)
self.traj = TrajLogger.read_traj_aclib_format(fn=self.traj_fn,
cs=self.scen.cs)
incumbent = self.traj[-1]['incumbent']
self.train_inst = self.scen.train_insts
self.test_inst = self.scen.test_insts
# Initialize SMAC-object
super().__init__(scenario=self.scen, runhistory=self.runhistory)
#restore_incumbent=incumbent)
# TODO use restore, delete next line
self.solver.incumbent = incumbent
if (not run_1_existed) and os.path.exists('run_1'):
shutil.rmtree('run_1')