Source code for discrete_optimization.generic_tools.ls.simulated_annealing

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
import random
from abc import abstractmethod
from collections.abc import Callable
from typing import Any, Optional

import numpy as np

from discrete_optimization.generic_tools.callbacks.callback import (
    Callback,
    CallbackList,
)
from discrete_optimization.generic_tools.do_mutation import Mutation
from discrete_optimization.generic_tools.do_problem import (
    ModeOptim,
    ObjectiveHandling,
    ParamsObjectiveFunction,
    Problem,
    Solution,
)
from discrete_optimization.generic_tools.do_solver import SolverDO, WarmstartMixin
from discrete_optimization.generic_tools.ls.local_search import (
    ModeMutation,
    RestartHandler,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)

logger = logging.getLogger(__name__)


[docs] class TemperatureScheduling: nb_iteration: int restart_handler: RestartHandler temperature: float
[docs] @abstractmethod def next_temperature(self) -> float: ...
[docs] class SimulatedAnnealing(SolverDO, WarmstartMixin): aggreg_from_sol: Callable[[Solution], float] aggreg_from_dict: Callable[[dict[str, float]], float] initial_solution: Optional[Solution] = None """Initial solution used for warm start.""" def __init__( self, problem: Problem, mutator: Mutation, restart_handler: RestartHandler, temperature_handler: TemperatureScheduling, mode_mutation: ModeMutation, params_objective_function: Optional[ParamsObjectiveFunction] = None, store_solution: bool = False, **kwargs, ): super().__init__( problem=problem, params_objective_function=params_objective_function ) self.mutator = mutator self.restart_handler = restart_handler self.temperature_handler = temperature_handler self.mode_mutation = mode_mutation if ( self.params_objective_function.objective_handling == ObjectiveHandling.MULTI_OBJ ): raise NotImplementedError( "SimulatedAnnealing is not implemented for multi objective optimization." ) self.mode_optim = self.params_objective_function.sense_function self.store_solution = store_solution
[docs] def set_warm_start(self, solution: Solution) -> None: """Make the solver warm start from the given solution. Will be ignored if arg `initial_variable` is set and not None in call to `solve()`. """ self.initial_solution = solution
[docs] def solve( self, nb_iteration_max: int, initial_variable: Optional[Solution] = None, callbacks: Optional[list[Callback]] = None, **kwargs: Any, ) -> ResultStorage: callbacks_list = CallbackList(callbacks=callbacks) if initial_variable is None: if self.initial_solution is None: raise ValueError( "initial_variable cannot be None if self.initial_solution is None.\n" "Use set_warm_start() to define it." ) else: initial_variable = self.initial_solution objective = self.aggreg_from_dict(self.problem.evaluate(initial_variable)) cur_variable = initial_variable.copy() cur_objective = objective cur_best_objective = objective store = self.create_result_storage( [(initial_variable, objective)], ) self.restart_handler.best_fitness = objective self.restart_handler.solution_best = initial_variable.copy() iteration = 0 # start of solve callback callbacks_list.on_solve_start(solver=self) while iteration < nb_iteration_max: local_improvement = False global_improvement = False if self.mode_mutation == ModeMutation.MUTATE: nv, move = self.mutator.mutate(cur_variable) objective = self.aggreg_from_dict(self.problem.evaluate(nv)) else: # self.mode_mutation == ModeMutation.MUTATE_AND_EVALUATE: nv, move, objective_dict_values = self.mutator.mutate_and_compute_obj( cur_variable ) objective = self.aggreg_from_dict(objective_dict_values) logger.debug( f"{iteration} / {nb_iteration_max} {objective} {cur_objective}" ) if self.mode_optim == ModeOptim.MINIMIZATION and objective < cur_objective: accept = True local_improvement = True global_improvement = objective < cur_best_objective elif ( self.mode_optim == ModeOptim.MAXIMIZATION and objective > cur_objective ): accept = True local_improvement = True global_improvement = objective > cur_best_objective else: r = random.random() fac = 1 if self.mode_optim == ModeOptim.MAXIMIZATION else -1 p = np.exp( fac * (objective - cur_objective) / self.temperature_handler.temperature ) accept = p > r if accept: cur_objective = objective cur_variable = nv logger.debug(f"iter accepted {iteration}") logger.debug(f"acceptance {objective}") else: cur_variable = move.backtrack_local_move(nv) if self.store_solution: store.append((nv.copy(), objective)) if global_improvement: logger.info(f"iter {iteration}") logger.info(f"new obj {objective} better than {cur_best_objective}") cur_best_objective = objective if not self.store_solution: store.append((cur_variable.copy(), objective)) self.temperature_handler.next_temperature() # Update the temperature self.restart_handler.update( nv, objective, global_improvement, local_improvement ) # Update info in restart handler cur_variable, cur_objective = self.restart_handler.restart( # type: ignore cur_variable, cur_objective ) # possibly restart somewhere iteration += 1 # end of step callback: stopping? stopping = callbacks_list.on_step_end( step=iteration, res=store, solver=self ) if stopping: break # end of solve callback callbacks_list.on_solve_end(res=store, solver=self) return store
[docs] class TemperatureSchedulingFactor(TemperatureScheduling): def __init__( self, temperature: float, restart_handler: RestartHandler, coefficient: float = 0.99, ): self.temperature = temperature self.restart_handler = restart_handler self.coefficient = coefficient
[docs] def next_temperature(self) -> float: self.temperature *= self.coefficient return self.temperature