Source code for discrete_optimization.binpack.solvers.cpsat

#  Copyright (c) 2025 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import logging
from collections.abc import Hashable
from enum import Enum
from typing import Any, Iterable, Optional, Union

from ortools.sat.python.cp_model import (
    CpSolverSolutionCallback,
    IntVar,
    LinearExpr,
    LinearExprT,
)

from discrete_optimization.binpack.problem import (
    BinPack,
    BinPackProblem,
    BinPackSolution,
    Item,
)
from discrete_optimization.generic_tasks_tools.enums import StartOrEnd
from discrete_optimization.generic_tasks_tools.solvers.cpsat import (
    AllocationCpSatSolver,
    SchedulingCpSatSolver,
)
from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Solution,
)
from discrete_optimization.generic_tools.do_solver import WarmstartMixin
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    EnumHyperparameter,
)

logger = logging.getLogger(__name__)


[docs] class ModelingBinPack(Enum): BINARY = 0 SCHEDULING = 1
[docs] class ModelingError(Exception): """Exception raised when calling for variables not existing for chosen modeling.""" ...
[docs] class CpSatBinPackSolver( AllocationCpSatSolver[Item, BinPack], SchedulingCpSatSolver[Item], WarmstartMixin ): hyperparameters = [ EnumHyperparameter( name="modeling", enum=ModelingBinPack, default=ModelingBinPack.BINARY ) ] problem: BinPackProblem modeling: ModelingBinPack def __init__( self, problem: BinPackProblem, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs: Any, ): super().__init__(problem, params_objective_function, **kwargs) self.variables: dict[str, Union[IntVar, dict[Hashable, IntVar]]] = {} self.upper_bound = self.problem.nb_items # upper bound on nb of bin packs
[docs] def retrieve_solution(self, cpsolvercb: CpSolverSolutionCallback) -> Solution: logger.info( f"Obj={cpsolvercb.objective_value}, Bound={cpsolvercb.best_objective_bound}" ) allocation: list[Optional[int]] = [None for i in range(self.problem.nb_items)] if self.modeling == ModelingBinPack.BINARY: for i, j in self.variables["allocation"]: if cpsolvercb.Value(self.variables["allocation"][(i, j)]) == 1: allocation[i] = j if self.modeling == ModelingBinPack.SCHEDULING: for i in self.variables["starts"]: allocation[i] = cpsolvercb.Value(self.variables["starts"][i]) return BinPackSolution(problem=self.problem, allocation=allocation)
[docs] def init_model(self, **args: Any) -> None: args = self.complete_with_default_hyperparameters(args) # store modeling and upper_bound (=> allowed resources) self.modeling = args["modeling"] self.upper_bound = min( args.get("upper_bound", self.problem.nb_items), self.problem.nb_items ) if args["modeling"] == ModelingBinPack.BINARY: self.init_model_binary(**args) elif args["modeling"] == ModelingBinPack.SCHEDULING: self.init_model_scheduling(**args)
@property def subset_unaryresources_allowed(self) -> Iterable[BinPack]: return range(self.upper_bound)
[docs] def init_model_binary(self, **args: Any): super().init_model(**args) # boolean allocation variables variables_allocation = { (i, bin_): self.cp_model.NewBoolVar(f"alloc_{i}_{bin_}") for i in range(self.problem.nb_items) for bin_ in range(self.upper_bound) } self.variables["allocation"] = variables_allocation # item in one bin for i in range(self.problem.nb_items): self.cp_model.AddExactlyOne( [variables_allocation[(i, bin)] for bin in range(self.upper_bound)] ) # max capacity of a bin for bin_ in range(self.upper_bound): self.cp_model.Add( LinearExpr.weighted_sum( [ variables_allocation[(i, bin_)] for i in range(self.problem.nb_items) ], [ self.problem.list_items[i].weight for i in range(self.problem.nb_items) ], ) <= self.problem.capacity_bin ) # bin used variable self.create_used_variables() self.variables["used"] = self.used_variables # constraints on bin ordering for bin_ in range(self.upper_bound): if bin_ >= 1: self.cp_model.add( self.used_variables[bin_] <= self.used_variables[bin_ - 1] ) # incompatible items if self.problem.has_constraint: for i, j in self.problem.incompatible_items: for bin_ in range(self.upper_bound): self.cp_model.AddForbiddenAssignments( [ variables_allocation[(i, bin_)], variables_allocation[(j, bin_)], ], [(1, 1)], ) self.cp_model.Minimize(self.get_nb_unary_resources_used_variable())
[docs] def get_task_unary_resource_is_present_variable( self, task: Item, unary_resource: BinPack ) -> LinearExprT: if self.modeling == ModelingBinPack.BINARY: return self.variables["allocation"][(task, unary_resource)] else: raise ModelingError(f"No allocation variable with {self.modeling}")
[docs] def init_model_scheduling(self, **args: Any): super().init_model(**args) starts = {} intervals = {} for i in range(self.problem.nb_items): starts[i] = self.cp_model.NewIntVar( lb=0, ub=self.upper_bound, name=f"bin_{i}" ) intervals[i] = self.cp_model.NewFixedSizeIntervalVar( start=starts[i], size=1, name=f"interval_{i}" ) self.cp_model.AddCumulative( [intervals[i] for i in range(self.problem.nb_items)], [self.problem.list_items[i].weight for i in range(self.problem.nb_items)], self.problem.capacity_bin, ) if self.problem.has_constraint: for i, j in self.problem.incompatible_items: self.cp_model.Add(starts[i] != starts[j]) self.variables["starts"] = starts makespan = self.get_global_makespan_variable() self.variables["makespan"] = makespan self.cp_model.minimize(makespan)
[docs] def get_makespan_upper_bound(self) -> int: return self.upper_bound + 1
[docs] def get_task_start_or_end_variable( self, task: Item, start_or_end: StartOrEnd ) -> LinearExprT: if not self.modeling == ModelingBinPack.SCHEDULING: raise ModelingError(f"No start or end variable with {self.modeling}") var = self.variables["starts"][task] if start_or_end == StartOrEnd.END: var = var + 1 return var
[docs] def set_warm_start(self, solution: BinPackSolution) -> None: if self.modeling == ModelingBinPack.SCHEDULING: self.cp_model.ClearHints() for i in range(self.problem.nb_items): self.cp_model.AddHint( self.variables["starts"][i], solution.allocation[i] ) self.cp_model.AddHint( self.variables["makespan"], max(solution.allocation) + 1 ) if self.modeling == ModelingBinPack.BINARY: self.cp_model.ClearHints() for i, bin_ in self.variables["allocation"]: if solution.allocation[i] == bin_: self.cp_model.AddHint(self.variables["allocation"][(i, bin_)], 1) else: self.cp_model.AddHint(self.variables["allocation"][(i, bin_)], 0) bins = set(solution.allocation) for bin_ in self.variables["used"]: if bin_ in bins: self.cp_model.AddHint(self.variables["used"][bin_], 1) else: self.cp_model.AddHint(self.variables["used"][bin_], 0)