# Copyright (c) 2024 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""Utilities for optuna."""
from __future__ import annotations
import logging
import math
import time
from collections import defaultdict
from typing import Any, Optional
import numpy as np
from discrete_optimization.generic_tools.callbacks.callback import Callback
from discrete_optimization.generic_tools.callbacks.loggers import ObjectiveLogger
from discrete_optimization.generic_tools.callbacks.optuna import OptunaCallback
from discrete_optimization.generic_tools.do_problem import Problem
from discrete_optimization.generic_tools.do_solver import SolverDO
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
Hyperparameter,
TrialDropped,
)
from discrete_optimization.generic_tools.optuna.timed_percentile_pruner import (
TimedPercentilePruner,
)
logger = logging.getLogger(__name__)
try:
import optuna
except ImportError:
logger.warning("You should install optuna to use this module.")
else:
from optuna.pruners import BasePruner, MedianPruner
from optuna.samplers import BaseSampler
from optuna.storages import JournalFileStorage, JournalStorage
from optuna.trial import Trial, TrialState
[docs]
def drop_already_tried_hyperparameters(trial: Trial) -> None:
"""Fail the trial if using same hyperparameters as a previous one."""
states_to_consider = (
TrialState.FAIL,
TrialState.COMPLETE,
TrialState.WAITING,
TrialState.PRUNED,
)
trials_to_consider = trial.study.get_trials(
deepcopy=False,
states=states_to_consider,
)
for t in reversed(trials_to_consider):
if trial.params == t.params and trial:
msg = "Trial with same hyperparameters as a previous trial: dropping it."
trial.set_user_attr("Error", msg)
raise TrialDropped(msg)
[docs]
def generic_optuna_experiment_monoproblem(
problem: Problem,
solvers_to_test: list[type[SolverDO]],
kwargs_fixed_by_solver: Optional[dict[type[SolverDO], dict[str, Any]]] = None,
suggest_optuna_kwargs_by_name_by_solver: Optional[
dict[type[SolverDO], dict[str, dict[str, Any]]]
] = None,
additional_hyperparameters_by_solver: Optional[
dict[type[SolverDO], list[Hyperparameter]]
] = None,
n_trials: int = 150,
check_satisfy: bool = True,
computation_time_in_study: bool = True,
study_basename: str = "study",
create_another_study: bool = True,
overwrite_study=False,
storage_path: str = "./optuna-journal.log",
sampler: Optional[BaseSampler] = None,
pruner: Optional[BasePruner] = None,
seed: Optional[int] = None,
min_time_per_solver: int = 5,
callbacks: Optional[list[Callback]] = None,
) -> optuna.Study:
"""Create and run an optuna study to tune solvers hyperparameters on a given problem.
The optuna study will choose a solver and its hyperparameters in order to optimize the fitness
on the given problem.
Pruning is potentially done at each optimization step thanks to dedicated callback.
This can be done
- either according to the optimization step number (but this is meaningful only when considering
a single solver or at least solvers of a same family so that comparing step number can be done),
- or according to the elapsed time (which should be more meaningful when comparing several types of solvers).
The optuna study can be monitored with optuna-dashboard with
optuna-dashboard optuna-journal.log
(or the relevant path set by `storage_path`)
Args:
problem: problem to consider
solvers_to_test: list of solvers to consider
kwargs_fixed_by_solver: fixed hyperparameters by solver.
Can also be other parameters needed by solvers' __init__(), init_model(), and solve() methods
suggest_optuna_kwargs_by_name_by_solver: kwargs_by_name passed to solvers' suggest_with_optuna().
Useful to restrict or specify choices, step, high, ...
additional_hyperparameters_by_solver: additional user-defined hyperparameters by solver, to be suggested by optuna
n_trials: Number of trials to be run in the optuna study
check_satisfy: Decide whether checking if solution found satisfies the problem. If not satisfying,
we consider the trial as failed and prune it without reporting the value.
computation_time_in_study: if `True` the intermediate reporting and pruning will be labelled according to elapsed time
instead of solver internal iteration number.
study_basename: Base name of the study generated.
If `create_another_study` is True, a timestamp will be added to this base name.
create_another_study: if `True` a timestamp prefix will be added to the study base name in order to avoid
overwriting or continuing a previously created study.
Should be False, if one wants to add trials to an existing study.
overwrite_study: if True, any study with the same name as the one generated here will be deleted before starting the optuna study.
Should be False, if one wants to add trials to an existing study.
storage_path: path to the journal used by optuna used to log the study. Can be a NFS path to allow parallelized optuna studies.
sampler: sampler used by the optuna study. If None, a TPESampler is used with the provided `seed`.
pruner: pruner used by the optuna study.
If None,
- if computation_time_in_study is True: TimedPercentilePruner(percentile=50, n_warmup_steps=min_time_per_solver)
- else: MedianPruner()
is used
seed: used to create the sampler if `sampler` is None. Should be set to an integer if one wants to ensure
reproducible results.
min_time_per_solver: if no pruner is defined, and computation_time_in_study is True,
we wait for these many seconds before allowing pruning.
callbacks: list of callbacks to plug in solvers' solve(). By default, use
`ObjectiveLogger(step_verbosity_level=logging.INFO, end_verbosity_level=logging.INFO)`
Moreover a `OptunaCallback` will be added to report intermediate values and prune accordingly.
Returns:
"""
# default parameters
if kwargs_fixed_by_solver is None:
kwargs_fixed_by_solver = defaultdict(dict)
if suggest_optuna_kwargs_by_name_by_solver is None:
suggest_optuna_kwargs_by_name_by_solver = defaultdict(dict)
if additional_hyperparameters_by_solver is None:
additional_hyperparameters_by_solver = defaultdict(list)
if sampler is None:
sampler = optuna.samplers.TPESampler(seed=seed)
if pruner is None:
if computation_time_in_study:
pruner = TimedPercentilePruner( # intermediate values interpolated at same "step"
percentile=50, # median pruner
n_warmup_steps=min_time_per_solver, # no pruning during first seconds
)
else:
pruner = MedianPruner()
if callbacks is None:
callbacks = [
ObjectiveLogger(
step_verbosity_level=logging.INFO,
end_verbosity_level=logging.INFO,
)
]
elapsed_time_attr = "elapsed_time"
# study name
suffix = f"-{time.time()}" if create_another_study else ""
study_name = f"{study_basename}{suffix}"
# we need to map the classes to a unique string, to be seen as a categorical hyperparameter by optuna
# by default, we use the class name, but if there are identical names, f"{cls.__module__}.{cls.__name__}" could be used.
solvers_by_name: dict[str, type[SolverDO]] = {
cls.__name__: cls for cls in solvers_to_test
}
# sense of optimization
direction = problem.get_optuna_study_direction()
# add new user-defined hyperparameters to the solvers
for (
solver_cls,
additional_hyperparameters,
) in additional_hyperparameters_by_solver.items():
solver_cls.hyperparameters = (
list(solver_cls.hyperparameters) + additional_hyperparameters
)
# objective definition
def objective(trial: Trial):
# hyperparameters to test
# first parameter: solver choice
solver_name: str = trial.suggest_categorical("solver", choices=solvers_by_name)
solver_class = solvers_by_name[solver_name]
# hyperparameters for the chosen solver
suggested_hyperparameters_kwargs = (
solver_class.suggest_hyperparameters_with_optuna(
trial=trial,
prefix=solver_name + ".",
kwargs_by_name=suggest_optuna_kwargs_by_name_by_solver.get(
solver_class, None
),
fixed_hyperparameters=kwargs_fixed_by_solver.get(solver_class, None),
)
)
# drop trial if corresponding to a previous trial (it may happen that the sampler repropose same params)
drop_already_tried_hyperparameters(trial)
logger.info(f"Launching trial {trial.number} with parameters: {trial.params}")
# construct kwargs for __init__, init_model, and solve
kwargs = dict(suggested_hyperparameters_kwargs) # copy
if solver_class in kwargs_fixed_by_solver:
kwargs.update(kwargs_fixed_by_solver[solver_class])
try:
# solver init
solver = solver_class(problem=problem, **kwargs)
solver.init_model(**kwargs)
# init timer
starting_time = time.perf_counter()
# add optuna callbacks
optuna_callback = OptunaCallback(
trial=trial,
starting_time=starting_time,
elapsed_time_attr=elapsed_time_attr,
report_time=computation_time_in_study,
# report intermediate values according to elapsed time instead of iteration number?
)
callbacks_for_optuna = (
callbacks + [optuna_callback] + kwargs.pop("callbacks", [])
)
# solve
res = solver.solve(
callbacks=callbacks_for_optuna,
**kwargs,
)
except Exception as e:
if isinstance(e, optuna.TrialPruned):
raise e # pruning error managed directly by optuna
else:
# Store exception message as trial user attribute
msg = f"{e.__class__}: {e}"
trial.set_user_attr("Error", msg)
raise optuna.TrialPruned(msg) # show failed
# store elapsed time
elapsed_time = time.perf_counter() - starting_time
trial.set_user_attr(elapsed_time_attr, elapsed_time)
# store result for this instance and report it as an intermediate value (=> dashboard + pruning)
if check_satisfy:
# accept only satisfying solutions
try:
sol, fit = res.get_best_solution_fit(satisfying=problem)
except Exception as e:
# Store exception message as trial user attribute
msg = f"{e.__class__}: {e}"
trial.set_user_attr("Error", msg)
raise optuna.TrialPruned(msg) # show failed
else:
# take the solution with best fit regardless of satifaction
sol, fit = res.get_best_solution_fit()
if fit is None:
# no solution found
if check_satisfy:
msg = f"No solution found satisfying the problem."
else:
msg = f"No solution found."
trial.set_user_attr("Error", msg)
raise optuna.TrialPruned(msg) # show failed
return fit
# create study + database to store it
storage = JournalStorage(JournalFileStorage(storage_path))
if overwrite_study:
try:
optuna.delete_study(study_name=study_name, storage=storage)
except:
pass
study = optuna.create_study(
study_name=study_name,
direction=direction,
sampler=sampler,
pruner=pruner,
storage=storage,
load_if_exists=not overwrite_study,
)
study.optimize(objective, n_trials=n_trials, catch=TrialDropped)
return study
[docs]
def generic_optuna_experiment_multiproblem(
problems: list[Problem],
solvers_to_test: list[type[SolverDO]],
kwargs_fixed_by_solver: Optional[dict[type[SolverDO], dict[str, Any]]] = None,
suggest_optuna_kwargs_by_name_by_solver: Optional[
dict[type[SolverDO], dict[str, dict[str, Any]]]
] = None,
additional_hyperparameters_by_solver: Optional[
dict[type[SolverDO], list[Hyperparameter]]
] = None,
n_trials: int = 150,
check_satisfy: bool = True,
study_basename: str = "study",
create_another_study: bool = True,
overwrite_study=False,
storage_path: str = "./optuna-journal.log",
sampler: Optional[BaseSampler] = None,
pruner: Optional[BasePruner] = None,
seed: Optional[int] = None,
prop_startup_instances: float = 0.2,
randomize_instances: bool = True,
report_cumulated_fitness: bool = False,
callbacks: Optional[list[Callback]] = None,
) -> optuna.Study:
"""Create and run an optuna study to tune solvers hyperparameters on several instances of a problem.
The optuna study will choose a solver and its hyperparameters in order to optimize the average fitness
on given problem instances.
Pruning is potentially made after each instance is solved based on how previous solvers performed
on this same instance.
The optuna study can be monitored with optuna-dashboard with
optuna-dashboard optuna-journal.log
(or the relevant path set by `storage_path`)
Args:
problems: list of problem instances to consider
solvers_to_test: list of solvers to consider
kwargs_fixed_by_solver: fixed hyperparameters by solver.
Can also be other parameters needed by solvers' __init__(), init_model(), and solve() methods
suggest_optuna_kwargs_by_name_by_solver: kwargs_by_name passed to solvers' suggest_with_optuna().
Useful to restrict or specify choices, step, high, ...
additional_hyperparameters_by_solver: additional user-defined hyperparameters by solver, to be suggested by optuna
n_trials: Number of trials to be run in the optuna study
check_satisfy: Decide whether checking if solution found satisfies the problem. If not satisfying,
we consider the trial as failed and prune it without reporting the value.
study_basename: Base name of the study generated.
If `create_another_study` is True, a timestamp will be added to this base name.
create_another_study: if `True` a timestamp prefix will be added to the study base name in order to avoid
overwriting or continuing a previously created study.
Should be False, if one wants to add trials to an existing study.
overwrite_study: if True, any study with the same name as the one generated here will be deleted before starting the optuna study.
Should be False, if one wants to add trials to an existing study.
storage_path: path to the journal used by optuna used to log the study. Can be a NFS path to allow parallelized optuna studies.
sampler: sampler used by the optuna study. If None, a TPESampler is used with the provided `seed`.
pruner: pruner used by the optuna study. If None, a WilcoxonPruner is used.
seed: used to create the sampler if `sampler` is None. Should be set to an integer if one wants to ensure
reproducible results.
prop_startup_instances: used if Pruner is None. Proportion of instances used to startup before allowing pruning.
randomize_instances: whether randomizing instances order when running a trial.
Should probably set to False if report_cumulated_fitness is set to True.
report_cumulated_fitness: whether reporting cumulated fitness instead of individual fitness for each problem instance.
Should be set to False when using WilcoxonPruner.
callbacks: list of callbacks to plug in solvers' solve(). By default, use
`ObjectiveLogger(step_verbosity_level=logging.INFO, end_verbosity_level=logging.INFO)`
Returns:
"""
# default parameters
if kwargs_fixed_by_solver is None:
kwargs_fixed_by_solver = defaultdict(dict)
if suggest_optuna_kwargs_by_name_by_solver is None:
suggest_optuna_kwargs_by_name_by_solver = defaultdict(dict)
if additional_hyperparameters_by_solver is None:
additional_hyperparameters_by_solver = defaultdict(list)
if sampler is None:
sampler = optuna.samplers.TPESampler(seed=seed)
if pruner is None:
n_startup_steps = math.ceil(prop_startup_instances * len(problems))
pruner = optuna.pruners.WilcoxonPruner(
p_threshold=0.1, n_startup_steps=n_startup_steps
)
if callbacks is None:
callbacks = [
ObjectiveLogger(
step_verbosity_level=logging.INFO,
end_verbosity_level=logging.INFO,
)
]
# cumulative fitness + Wilcoxon = incompatible
if isinstance(pruner, optuna.pruners.WilcoxonPruner) and report_cumulated_fitness:
raise ValueError(
"`report_cumulated_fitness` cannot be true when using a WilcoxonPruner."
)
# study name
suffix = f"-{time.time()}" if create_another_study else ""
study_name = f"{study_basename}{suffix}"
# we need to map the classes to a unique string, to be seen as a categorical hyperparameter by optuna
# by default, we use the class name, but if there are identical names, f"{cls.__module__}.{cls.__name__}" could be used.
solvers_by_name: dict[str, type[SolverDO]] = {
cls.__name__: cls for cls in solvers_to_test
}
# sense of optimization
direction = problems[0].get_optuna_study_direction()
# add new user-defined hyperparameters to the solvers
for (
solver_cls,
additional_hyperparameters,
) in additional_hyperparameters_by_solver.items():
solver_cls.hyperparameters = (
list(solver_cls.hyperparameters) + additional_hyperparameters
)
# objective definition
def objective(trial: Trial):
# hyperparameters to test
# first parameter: solver choice
solver_name: str = trial.suggest_categorical("solver", choices=solvers_by_name)
solver_class = solvers_by_name[solver_name]
# hyperparameters for the chosen solver
suggested_hyperparameters_kwargs = (
solver_class.suggest_hyperparameters_with_optuna(
trial=trial,
prefix=solver_name + ".",
kwargs_by_name=suggest_optuna_kwargs_by_name_by_solver.get(
solver_class, None
), # options to restrict the choices of some hyperparameter
fixed_hyperparameters=kwargs_fixed_by_solver.get(solver_class, None),
)
)
# drop trial if corresponding to a previous trial (it may happen that the sampler repropose same params)
drop_already_tried_hyperparameters(trial)
logger.info(f"Launching trial {trial.number} with parameters: {trial.params}")
# construct kwargs for __init__, init_model, and solve
kwargs = dict(suggested_hyperparameters_kwargs) # copy
if solver_class in kwargs_fixed_by_solver:
kwargs.update(kwargs_fixed_by_solver[solver_class])
# loop on problem instances
fitnesses = []
cumulated_fitness = 0.0
i_cumulated_fitness = 0
# For best results, shuffle the evaluation order in each trial.
if randomize_instances:
instance_ids = np.random.permutation(len(problems))
else:
instance_ids = list(range(len(problems)))
for instance_id in instance_ids:
instance_id = int(instance_id) # convert np.int64 into python int
problem = problems[instance_id]
try:
# solver init
solver = solver_class(problem=problem, **kwargs)
solver.init_model(**kwargs)
callbacks_for_trial = callbacks + kwargs.pop("callbacks", [])
# solve
res = solver.solve(
callbacks=callbacks_for_trial,
**kwargs,
)
except Exception as e:
# Store exception message as trial user attribute
msg = f"{e.__class__}: {e}"
trial.set_user_attr("Error", msg)
trial.set_user_attr("pruned", True)
raise optuna.TrialPruned(msg) # show failed
# store result for this instance and report it as an intermediate value (=> dashboard + pruning)
if check_satisfy:
# accept only satisfying solutions
try:
sol, fit = res.get_best_solution_fit(satisfying=problem)
except Exception as e:
# Store exception message as trial user attribute
msg = f"{e.__class__}: {e}"
trial.set_user_attr("pruned", True)
trial.set_user_attr("Error", msg)
raise optuna.TrialPruned(msg) # show failed
else:
# take the solution with best fit regardless of satifaction
sol, fit = res.get_best_solution_fit()
if fit is None:
# no solution found
if check_satisfy:
msg = f"No solution found satisfying problem #{instance_id}."
else:
msg = f"No solution found for problem #{instance_id}."
trial.set_user_attr("pruned", True)
trial.set_user_attr("Error", msg)
raise optuna.TrialPruned(msg) # show failed
else:
fitnesses.append(fit)
if report_cumulated_fitness:
cumulated_fitness += fit
trial.report(cumulated_fitness, i_cumulated_fitness)
i_cumulated_fitness += 1
else:
trial.report(fit, instance_id)
current_average = sum(fitnesses) / len(fitnesses)
trial.set_user_attr("current_fitness_average", current_average)
if trial.should_prune():
# return current average instead of raising TrialPruned,
# else optuna dashboard thinks that last intermediate fitness is the value for the trial
trial.set_user_attr("pruned", True)
trial.set_user_attr(
"Error", f"Pruned by pruner at problem instance #{instance_id}."
)
return current_average
trial.set_user_attr("pruned", False)
return sum(fitnesses) / len(fitnesses)
# create study + database to store it
storage = JournalStorage(JournalFileStorage(storage_path))
if overwrite_study:
try:
optuna.delete_study(study_name=study_name, storage=storage)
except:
pass
study = optuna.create_study(
study_name=study_name,
direction=direction,
sampler=sampler,
pruner=pruner,
storage=storage,
load_if_exists=not overwrite_study,
)
study.optimize(objective, n_trials=n_trials, catch=TrialDropped)
return study