Source code for discrete_optimization.generic_tools.callbacks.early_stoppers

#  Copyright (c) 2024 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
from datetime import datetime
from typing import Optional

from discrete_optimization.generic_tools.callbacks.callback import Callback
from discrete_optimization.generic_tools.do_solver import SolverDO
from discrete_optimization.generic_tools.ortools_cpsat_tools import OrtoolsCpSatSolver
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)

logger = logging.getLogger(__name__)


[docs] class TimerStopper(Callback): """Callback to stop the optimization after a given time. Stops the optimization process if a limit training time has been elapsed. This time is checked after each `check_nb_steps` steps. """ def __init__(self, total_seconds: int, check_nb_steps: int = 1): """ Args: total_seconds: Total time in seconds allowed to solve check_nb_steps: Number of steps to wait before next time check """ self.total_seconds = total_seconds self.check_nb_steps = check_nb_steps
[docs] def on_solve_start(self, solver: SolverDO): self.initial_training_time = datetime.utcnow()
[docs] def on_step_end( self, step: int, res: ResultStorage, solver: SolverDO ) -> Optional[bool]: if step % self.check_nb_steps == 0: current_time = datetime.utcnow() difference = current_time - self.initial_training_time difference_seconds = difference.total_seconds() logger.debug(f"{difference_seconds} seconds elapsed since solve start.") if difference_seconds >= self.total_seconds: logger.info(f"{self.__class__.__name__} callback met its criteria") return True return False
[docs] class NbIterationStopper(Callback): """Callback to stop the optimization when a given number of solutions are found.""" def __init__(self, nb_iteration_max: int): self.nb_iteration_max = nb_iteration_max self.nb_iteration = 0
[docs] def on_step_end( self, step: int, res: ResultStorage, solver: SolverDO ) -> Optional[bool]: self.nb_iteration += 1 if self.nb_iteration >= self.nb_iteration_max: logger.info( f"{self.__class__.__name__} callback met its criteria: max number of iterations reached" ) return True else: return False
[docs] class ObjectiveGapCpSatSolver(Callback): """ Stop the cpsat solver according to some classical convergence criteria It could be done differently (playing with parameters of cpsat directly) """ def __init__( self, objective_gap_rel: Optional[float] = None, objective_gap_abs: Optional[float] = None, ): self.objective_gap_rel = objective_gap_rel self.objective_gap_abs = objective_gap_abs
[docs] def on_step_end( self, step: int, res: ResultStorage, solver: OrtoolsCpSatSolver ) -> Optional[bool]: best_sol = solver.clb.ObjectiveValue() bound = solver.clb.BestObjectiveBound() if self.objective_gap_abs is not None: if abs(bound - best_sol) <= self.objective_gap_abs: logger.debug( f"Stopping search, absolute gap {abs(bound-best_sol)}<{self.objective_gap_abs}" ) return True if self.objective_gap_rel is not None: if bound != 0: if abs(bound - best_sol) / abs(bound) <= self.objective_gap_rel: logger.debug( f"Stopping search, relative gap {abs(bound-best_sol)/abs(bound)}<{self.objective_gap_rel}" ) return True