Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_frozen_trial(params, param_distributions):
# type: (Dict[str, Any], Dict[str, distributions.BaseDistribution]) -> FrozenTrial
return FrozenTrial(
number=0,
value=1.,
state=optuna.structs.TrialState.COMPLETE,
user_attrs={},
system_attrs={},
params=params,
distributions=param_distributions,
intermediate_values={},
datetime_start=None,
datetime_complete=None,
trial_id=0,
)
for step, values in enumerate(_intermediate_values):
# Study does not have any trials.
with pytest.raises(ValueError):
_all_trials = _study._storage.get_all_trials(_study._study_id)
_direction = _study._storage.get_study_direction(_study._study_id)
percentile._get_percentile_intermediate_result_over_trials(
_all_trials, _direction, step, 25)
for i in range(trial_num):
trial_id = trial_ids[i]
value = values[i]
_study._storage.set_trial_intermediate_value(trial_id, step, value)
# Set trial states complete because this method ignores incomplete trials.
for trial_id in trial_ids:
_study._storage.set_trial_state(trial_id, TrialState.COMPLETE)
return _study
model = L.Classifier(chainer.Sequential(L.Linear(None, 2)))
optimizer = chainer.optimizers.Adam()
optimizer.setup(model)
train_iter = chainer.iterators.SerialIterator(FixedValueDataset(), 16)
updater = chainer.training.StandardUpdater(train_iter, optimizer)
trainer = chainer.training.Trainer(updater, (1, 'epoch'))
trainer.extend(
optuna.integration.chainer.ChainerPruningExtension(trial, 'main/loss', (1, 'epoch')))
trainer.run(show_loop_exception_msg=False)
return 1.0
study = optuna.create_study(pruner=DeterministicPruner(True))
study.optimize(objective, n_trials=1)
assert study.trials[0].state == optuna.structs.TrialState.PRUNED
study = optuna.create_study(pruner=DeterministicPruner(False))
study.optimize(objective, n_trials=1)
assert study.trials[0].state == optuna.structs.TrialState.COMPLETE
assert study.trials[0].value == 1.0
# Assert failed trial count.
failed_trials = [t for t in mn_study.trials if t.state == TrialState.FAIL]
assert len(failed_trials) == n_trials
# Synchronize nodes before executing the next optimization.
comm.mpi_comm.barrier()
# Invoke optimize in which no exceptions are accepted.
with pytest.raises(ValueError):
mn_study.optimize(objective, n_trials=n_trials, catch=())
# Assert trial count.
assert len(mn_study.trials) == n_trials + 1
# Assert failed trial count.
failed_trials = [t for t in mn_study.trials if t.state == TrialState.FAIL]
assert len(failed_trials) == n_trials + 1
TypeError,
):
message = 'Setting status of trial#{} as {} because the returned value from the ' \
'objective function cannot be casted to float. Returned value is: ' \
'{}'.format(trial_number, structs.TrialState.FAIL, repr(result))
self.logger.warning(message)
self._storage.set_trial_system_attr(trial_id, 'fail_reason', message)
self._storage.set_trial_state(trial_id, structs.TrialState.FAIL)
return trial
if math.isnan(result):
message = 'Setting status of trial#{} as {} because the objective function ' \
'returned {}.'.format(trial_number, structs.TrialState.FAIL, result)
self.logger.warning(message)
self._storage.set_trial_system_attr(trial_id, 'fail_reason', message)
self._storage.set_trial_state(trial_id, structs.TrialState.FAIL)
return trial
trial.report(result)
self._storage.set_trial_state(trial_id, structs.TrialState.COMPLETE)
self._log_completed_trial(trial_number, result)
return trial
def train_optuna(config):
study = optuna.create_study()
study.optimize(
partial(optuna_objective, config=config),
n_trials=config['optuna_trials'])
trial = study.best_trial
pruned_trials = [t for t in study.trials
if t.state == optuna.structs.TrialState.PRUNED]
complete_trials = [t for t in study.trials
if t.state == optuna.structs.TrialState.COMPLETE]
print('Study statistics: ')
print(' Number of finished trials: ', len(study.trials))
print(' Number of pruned trials: ', len(pruned_trials))
print(' Number of complete trials: ', len(complete_trials))
print('Best trial:')
print(' Value: ', trial.value)
print(' Params: ')
for key, value in trial.params.items():
print(' {}: {}'.format(key, value))
outdir = config['outdir']
study.trials_dataframe().to_csv(f'{outdir}/result.tsv', sep='\t')
def show_result(study):
pruned_trials = [t for t in study.trials if t.state == optuna.structs.TrialState.PRUNED]
complete_trials = [t for t in study.trials if t.state == optuna.structs.TrialState.COMPLETE]
print('Study statistics: ')
print(' Number of finished trials: ', len(study.trials))
print(' Number of pruned trials: ', len(pruned_trials))
print(' Number of complete trials: ', len(complete_trials))
print('Best trial:')
trial = study.best_trial
print(' Value: ', trial.value)
print(' Params: ')
for key, value in trial.params.items():
print(' {}: {}'.format(key, value))
Returns: None
"""
# Generate the Optuna CSV log file
if self.enable_optuna_log:
self.generate_optuna_log_file()
# best_trial
try:
self.best_trial = self.study.best_trial
except:
self.best_trial = get_trial_default()
# latest_trial
self.latest_trial = get_trial_default()
if len(self.study.trials) >= 1:
if self.study.trials[-1].state == optuna.structs.TrialState.RUNNING:
if len(self.study.trials) >= 2:
self.latest_trial = self.study.trials[-2]
else:
self.latest_trial = self.study.trials[-1]
if self.verbose >= 1:
self.print_results()