# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
from typing import Any, Optional
from discrete_optimization.generic_tools.callbacks.callback import (
Callback,
CallbackList,
)
from discrete_optimization.generic_tools.do_mutation import Mutation
from discrete_optimization.generic_tools.do_problem import (
ModeOptim,
ParamsObjectiveFunction,
Problem,
Solution,
)
from discrete_optimization.generic_tools.do_solver import SolverDO, WarmstartMixin
from discrete_optimization.generic_tools.ls.local_search import (
ModeMutation,
RestartHandler,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
ParetoFront,
ResultStorage,
)
logger = logging.getLogger(__name__)
[docs]
class HillClimber(SolverDO, WarmstartMixin):
initial_solution: Optional[Solution] = None
"""Initial solution used for warm start."""
def __init__(
self,
problem: Problem,
mutator: Mutation,
restart_handler: RestartHandler,
mode_mutation: ModeMutation,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
store_solution: bool = False,
**kwargs,
):
super().__init__(
problem=problem, params_objective_function=params_objective_function
)
self.mutator = mutator
self.restart_handler = restart_handler
self.mode_mutation = mode_mutation
self.mode_optim = self.params_objective_function.sense_function
self.store_solution = store_solution
[docs]
def set_warm_start(self, solution: Solution) -> None:
"""Make the solver warm start from the given solution.
Will be ignored if arg `initial_variable` is set and not None in call to `solve()`.
"""
self.initial_solution = solution
[docs]
def solve(
self,
nb_iteration_max: int,
initial_variable: Optional[Solution] = None,
callbacks: Optional[list[Callback]] = None,
**kwargs: Any,
) -> ResultStorage:
callbacks_list = CallbackList(callbacks=callbacks)
if initial_variable is None:
if self.initial_solution is None:
raise ValueError(
"initial_variable cannot be None if self.initial_solution is None.\n"
"Use set_warm_start() to define it."
)
else:
initial_variable = self.initial_solution
objective = self.aggreg_from_dict(self.problem.evaluate(initial_variable))
cur_variable = initial_variable.copy()
store = self.create_result_storage(
[(initial_variable, objective)],
)
cur_objective = objective
cur_best_objective = objective
self.restart_handler.best_fitness = objective
self.restart_handler.solution_best = initial_variable.copy()
iteration = 0
# start of solve callback
callbacks_list.on_solve_start(solver=self)
while iteration < nb_iteration_max:
accept = False
local_improvement = False
global_improvement = False
if self.mode_mutation == ModeMutation.MUTATE:
nv, move = self.mutator.mutate(cur_variable)
objective = self.aggreg_from_sol(nv)
elif self.mode_mutation == ModeMutation.MUTATE_AND_EVALUATE:
nv, move, objective_dict_values = self.mutator.mutate_and_compute_obj(
cur_variable
)
objective = self.aggreg_from_dict(objective_dict_values)
if self.mode_optim == ModeOptim.MINIMIZATION and objective < cur_objective:
accept = True
local_improvement = True
global_improvement = objective < cur_best_objective
elif (
self.mode_optim == ModeOptim.MAXIMIZATION and objective > cur_objective
):
accept = True
local_improvement = True
global_improvement = objective > cur_best_objective
if accept:
cur_objective = objective
cur_variable = nv
else:
cur_variable = move.backtrack_local_move(nv)
if self.store_solution:
store.append((nv.copy(), objective))
if global_improvement:
logger.debug(f"iter {iteration}")
logger.debug(f"new obj {objective} better than {cur_best_objective}")
cur_best_objective = objective
if not self.store_solution:
store.append((cur_variable.copy(), objective))
# Update the temperature
self.restart_handler.update(
nv, objective, global_improvement, local_improvement
)
# Update info in restart handler
cur_variable, cur_objective = self.restart_handler.restart(
cur_variable, cur_objective
)
# possibly restart somewhere
iteration += 1
# end of step callback: stopping?
stopping = callbacks_list.on_step_end(
step=iteration, res=store, solver=self
)
if stopping:
break
# end of solve callback
callbacks_list.on_solve_end(res=store, solver=self)
return store
[docs]
class HillClimberPareto(HillClimber):
def __init__(
self,
problem: Problem,
mutator: Mutation,
restart_handler: RestartHandler,
mode_mutation: ModeMutation,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
store_solution: bool = False,
):
super().__init__(
problem=problem,
mutator=mutator,
restart_handler=restart_handler,
mode_mutation=mode_mutation,
params_objective_function=params_objective_function,
store_solution=store_solution,
)
[docs]
def solve(
self,
nb_iteration_max: int,
initial_variable: Optional[Solution] = None,
update_iteration_pareto: int = 1000,
callbacks: Optional[list[Callback]] = None,
**kwargs: Any,
) -> ParetoFront:
callbacks_list = CallbackList(callbacks=callbacks)
if initial_variable is None:
if self.initial_solution is None:
raise ValueError(
"initial_variable cannot be None if self.initial_solution is None.\n"
"Use set_warm_start() to define it."
)
else:
initial_variable = self.initial_solution
objective = self.aggreg_from_dict(self.problem.evaluate(initial_variable))
pareto_front = ParetoFront(
list_solution_fits=[(initial_variable, objective)],
mode_optim=self.params_objective_function.sense_function,
)
cur_variable = initial_variable.copy()
cur_objective = objective
cur_best_objective = objective
self.restart_handler.best_fitness = objective
self.restart_handler.solution_best = initial_variable.copy()
iteration = 0
# start of solve callback
callbacks_list.on_solve_start(solver=self)
while iteration < nb_iteration_max:
accept = False
local_improvement = False
global_improvement = False
if iteration % update_iteration_pareto == 0:
pareto_front.finalize()
if self.mode_mutation == ModeMutation.MUTATE:
nv, move = self.mutator.mutate(cur_variable)
objective = self.aggreg_from_sol(nv)
elif self.mode_mutation == ModeMutation.MUTATE_AND_EVALUATE:
nv, move, objective_dict_values = self.mutator.mutate_and_compute_obj(
cur_variable
)
objective = self.aggreg_from_dict(objective_dict_values)
if self.mode_optim == ModeOptim.MINIMIZATION and objective < cur_objective:
accept = True
local_improvement = True
global_improvement = objective < cur_best_objective
pareto_front.append((nv.copy(), objective))
elif (
self.mode_optim == ModeOptim.MINIMIZATION and objective == cur_objective
):
accept = True
local_improvement = True
global_improvement = objective == cur_best_objective
pareto_front.append((nv.copy(), objective))
elif (
self.mode_optim == ModeOptim.MAXIMIZATION and objective > cur_objective
):
accept = True
local_improvement = True
global_improvement = objective > cur_best_objective
pareto_front.append((nv.copy(), objective))
elif (
self.mode_optim == ModeOptim.MAXIMIZATION and objective == cur_objective
):
accept = True
local_improvement = True
global_improvement = objective == cur_best_objective
pareto_front.append((nv.copy(), objective))
if accept:
logger.debug(f"Accept : {objective}")
cur_objective = objective
cur_variable = nv
else:
cur_variable = move.backtrack_local_move(nv)
if global_improvement:
logger.debug(f"iter {iteration}")
logger.debug(f"new obj {objective} better than {cur_best_objective}")
cur_best_objective = objective
# Update the temperature
self.restart_handler.update(
nv, objective, global_improvement, local_improvement
)
logger.debug(f"Len pareto : {pareto_front.len_pareto_front()}")
# Update info in restart handler
cur_variable, cur_objective = self.restart_handler.restart(
cur_variable, cur_objective
)
# possibly restart somewhere
iteration += 1
# end of step callback: stopping?
stopping = callbacks_list.on_step_end(
step=iteration, res=pareto_front, solver=self
)
if stopping:
break
pareto_front.finalize()
# end of solve callback
callbacks_list.on_solve_end(res=pareto_front, solver=self)
return pareto_front