Source code for discrete_optimization.multibatching.solvers.benders

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
from copy import copy
from typing import Any, Optional

from discrete_optimization.generic_tools.callbacks.callback import (
    Callback,
    CallbackList,
)
from discrete_optimization.generic_tools.cp_tools import ParametersCp
from discrete_optimization.generic_tools.do_solver import SolverDO
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    SubBrick,
    SubBrickHyperparameter,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.multibatching.problem import (
    MultibatchingProblem,
    Product,
    TransportLink,
)
from discrete_optimization.multibatching.solvers.cpsat import (
    CpsatMultibatchingSolver,
    ModelingMultiBatch,
)
from discrete_optimization.multibatching.solvers.packing_subproblem import (
    CpsatPackingSubproblem,
    GreedyPackingForMultibatching,
    PackingSubproblemSolver,
    PackingViaBinPacking,
)


[docs] class CpsatBendersMultibatchingSolver(SolverDO): hyperparameters = [ SubBrickHyperparameter( name="packing_solver", choices=[ CpsatPackingSubproblem, GreedyPackingForMultibatching, PackingViaBinPacking, ], default=SubBrick(cls=GreedyPackingForMultibatching, kwargs=None), ) ] problem: MultibatchingProblem def __init__(self, problem: MultibatchingProblem, **kwargs: Any): super().__init__(problem, **kwargs) self.variables = {} # self.modeling: ModelingMultiBatch = None self.base_solution = None self.solver_master: CpsatMultibatchingSolver = None self.solver_subproblem: PackingSubproblemSolver = None self.scaling_factor = kwargs.get("scaling_factor", 1) self.nb_cuts = 0
[docs] def init_model(self, **kwargs: Any) -> None: self.solver_master = CpsatMultibatchingSolver( problem=self.problem, scaling_factor=self.scaling_factor ) self.solver_master.init_model(modeling=ModelingMultiBatch.FLOW, **kwargs)
[docs] def update_model_master( self, changed: list[tuple[TransportLink, dict[Product, int], int, int]], best_lb: int, ): for link, flows, new_nb_trip, prev_nb_trip in changed: index_link = self.problem.transport_links_to_index[link] indicators = [] for all_index_link in [index_link]: for product in flows: ind = self.solver_master.cp_model.NewBoolVar( f"flow_{all_index_link}_{product.id}_equal_{flows[product]}" ) var = self.solver_master.variables["flows"][all_index_link][ self.problem.product_to_index[product] ] self.solver_master.cp_model.Add( var == flows[product] ).OnlyEnforceIf(ind) self.solver_master.cp_model.Add( var != flows[product] ).OnlyEnforceIf(ind.Not()) indicators.append(ind) all_equal = self.solver_master.cp_model.NewBoolVar( f"indicator_cut_{all_index_link}_{self.nb_cuts}" ) self.solver_master.cp_model.Add(all_equal == 1).OnlyEnforceIf( *indicators ) ( self.solver_master.cp_model.Add( self.solver_master.variables["nb_trips"][all_index_link] == new_nb_trip ).OnlyEnforceIf(all_equal) ) self.nb_cuts += 1
# self.solver_master.cp_model.Add(self.solver_master.variables["total_obj"] # >= int(math.floor(best_lb)))
[docs] def solve( self, callbacks: Optional[list[Callback]] = None, time_limit_first_iter: int = 100, params_solver_master: dict = None, nb_iteration: int = 10, **kwargs: Any, ) -> ResultStorage: if self.solver_master is None: self.init_model(**kwargs) kwargs = self.complete_with_default_hyperparameters(kwargs) callback = CallbackList() callback.on_solve_start(solver=self) if params_solver_master is None: params_solver_master = dict( parameters_cp=ParametersCp.default_cpsat(), time_limit=5, ortools_cpsat_solver_kwargs={"log_search_progress": True}, ) subbrick_packing_solver: SubBrick = kwargs["packing_solver"] params_solver_sub = subbrick_packing_solver.kwargs if params_solver_sub is None: params_solver_sub = dict( parameters_cp=ParametersCp.default_cpsat(), time_limit=5, ortools_cpsat_solver_kwargs={"log_search_progress": True}, ) res_merged = self.create_result_storage([]) current_lbs = [] for i in range(nb_iteration): if i >= 1: bs, _ = res_merged.get_best_solution_fit(satisfying=self.problem) self.solver_master.set_warm_start(bs) if i == 0: kwargs = copy(params_solver_master) kwargs["time_limit"] = time_limit_first_iter res = self.solver_master.solve(callbacks=callbacks, **kwargs) else: res = self.solver_master.solve( callbacks=callbacks, **params_solver_master ) if len(res) > 0: current_lbs.append( self.solver_master.get_current_best_internal_objective_bound() ) base_solution = res[-1][0] print("Status master : ", self.solver_master.status_solver) print( "Cur solution", self.problem.evaluate(base_solution), self.problem.satisfy(base_solution), ) print("Obj ", self.aggreg_from_sol(base_solution)) subsolver: PackingSubproblemSolver = subbrick_packing_solver.cls( problem=self.problem ) subsolver.init_from_solution(solution=base_solution) self.solver_subproblem = subsolver res_sub_problem = subsolver.solve(**params_solver_sub) res_merged.extend(res_sub_problem) print("Status subsolver : ", subsolver.status_solver) print( "Solution post pro", self.problem.evaluate(res_sub_problem[-1][0]), self.problem.satisfy(res_sub_problem[-1][0]), ) print("Obj ", self.aggreg_from_sol(res_sub_problem[-1][0])) callback.on_step_end(i, res=res_sub_problem, solver=self) changes = subsolver.analyse_solution( new_solution=res_sub_problem[-1][0] ) if len(changes) == 0: print("stop") break print("Nb changes :", len(changes)) best_lb = max(current_lbs) self.update_model_master(changed=changes, best_lb=best_lb) bs, best_ub = res_merged.get_best_solution_fit(satisfying=self.problem) print("Best lb", best_lb) print("Best Ub", best_ub) return res_merged