Source code for discrete_optimization.flex_scheduling.solvers.heuristic_solver

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
from typing import Any, Hashable, Optional

import numpy as np

from discrete_optimization.flex_scheduling.problem import (
    FlexProblem,
    ScheduleSolution,
    ScheduleSolutionPreemptive,
)
from discrete_optimization.flex_scheduling.simulator import PostprocessTool
from discrete_optimization.flex_scheduling.solvers.cpsat import CpSatFlexSolver
from discrete_optimization.flex_scheduling.solvers.cpsat_preempt import (
    CPSatFlexSPPreempt,
)
from discrete_optimization.generic_tools.callbacks.callback import Callback
from discrete_optimization.generic_tools.cp_tools import ParametersCp
from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Solution,
)
from discrete_optimization.generic_tools.do_solver import SolverDO, WarmstartMixin
from discrete_optimization.generic_tools.lexico_tools import LexicoSolver
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)


[docs] class WSCallback(Callback): def __init__(self, objectives: list[str]): self.objectives = objectives
[docs] def on_step_end(self, step: int, res: ResultStorage, solver: SolverDO): solver.subsolver.set_warm_start_from_sol(res[-1][0]) return False
[docs] class HeuristicSolverFlexProblem(SolverDO, WarmstartMixin): hyperparameters = [] def __init__( self, problem: FlexProblem, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs: Any, ): super().__init__( problem, params_objective_function=params_objective_function, **kwargs ) self.problem: FlexProblem = self.problem self.previous_calendar: dict[Hashable, np.ndarray] = {} self.ws: ScheduleSolution = None
[docs] def modify_calendars(self): fsp: FlexProblem = self.problem for resource in fsp.resources: calendar = resource.calendar_availability indices = np.nonzero(calendar) non_zero_values = set(calendar[indices]) max_value = np.max(calendar) self.previous_calendar[resource.id] = np.copy(calendar) if len(non_zero_values) > 0 and max_value >= 2: # if 0<res_availability<max_capacity, cut to 0 ! indices_ = np.nonzero( np.logical_and(calendar > 0, calendar != max_value) ) calendar[indices_] = 0 fsp.update_data_placeholders()
[docs] def put_back_calendar(self): for resource in self.problem.resources: resource.calendar_availability = self.previous_calendar[resource.id] self.problem.update_data_placeholders()
[docs] def set_warm_start(self, solution: Solution) -> None: self.ws = solution
[docs] def solve( self, callbacks: Optional[list[Callback]] = None, params_cp_first_solve: ParametersCp = None, params_cp_second_solve: ParametersCp = None, time_limit_per_objectives_first_solve: int = 150, time_limit_per_objectives_second_solve: int = 150, objectives_first_solve: list[str] = None, objectives_second_solve: list[str] = None, **kwargs: Any, ) -> ResultStorage: self.modify_calendars() subsolver = CpSatFlexSolver(self.problem) subsolver.init_model() if self.ws is not None: subsolver.set_warm_start_from_sol(self.ws) if objectives_first_solve is None: objectives_first_solve = [ "tardiness", "earliness", "resource_cost", "makespan", ] lexico_solver = LexicoSolver(subsolver=subsolver, problem=self.problem) if params_cp_first_solve is None: params_cp_first_solve = ParametersCp.default_cpsat() params_cp_first_solve.nb_process = 16 res = lexico_solver.solve( callbacks=[WSCallback(objectives_first_solve)], objectives=objectives_first_solve, subsolver_callbacks=[], time_limit=time_limit_per_objectives_first_solve, parameters_cp=params_cp_first_solve, ortools_cpsat_solver_kwargs={"log_search_progress": True}, ) sol = res[-1][0] # Put back real calendar self.put_back_calendar() # Post process schedule left. pptool = PostprocessTool(flex_problem=self.problem, solution=sol) schedule = pptool.post_process_left( flex_problem=self.problem, solution=sol, keep_min_time=False, keep_strict_order_task=False, ) sol_ = ScheduleSolutionPreemptive( problem=self.problem, schedule=[ schedule[self.problem.index_to_task_id[i]] for i in range(self.problem.nb_tasks) ], modes=np.ones(self.problem.nb_tasks), ) solver = CPSatFlexSPPreempt(problem=self.problem) solver.init_model() solver.set_warm_start_from_sol(sol_) if params_cp_second_solve is None: params_cp_second_solve = ParametersCp.default_cpsat() params_cp_second_solve.nb_process = 16 if objectives_second_solve is None: objectives_second_solve = [ "tardiness", "earliness", "resource_cost", "makespan", ] lexico_solver = LexicoSolver(subsolver=solver, problem=self.problem) res_preemptive = lexico_solver.solve( callbacks=[WSCallback(objectives_second_solve)], objectives=objectives_second_solve, subsolver_callbacks=kwargs.get("subsolver_callbacks", []), time_limit=time_limit_per_objectives_second_solve, parameters_cp=params_cp_second_solve, ortools_cpsat_solver_kwargs=dict( log_search_progress=True, # probing_deterministic_time_limit=0, # use_sat_inprocessing=False, # linearization_level=0, # fix_variables_to_their_hinted_value=False, debug_crash_on_bad_hint=False, ), ) res.extend(res_preemptive.list_solution_fits) return res