Source code for discrete_optimization.binpack.solvers.optal

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
from __future__ import annotations

from typing import Any

from discrete_optimization.binpack.problem import (
    BinPack,
    BinPackProblem,
    BinPackSolution,
    Item,
)
from discrete_optimization.generic_tasks_tools.allocation import UnaryResource
from discrete_optimization.generic_tasks_tools.base import Task
from discrete_optimization.generic_tasks_tools.enums import StartOrEnd
from discrete_optimization.generic_tasks_tools.solvers.optalcp_tasks_solver import (
    AllocationOptalSolver,
    SchedulingOptalSolver,
)
from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Solution,
)

try:
    import optalcp as cp
except ImportError:
    cp = None
    optalcp_available = False
else:
    optalcp_available = True


[docs] class OptalBinPackSolver( AllocationOptalSolver[Item, BinPack], SchedulingOptalSolver[Item], ): problem: BinPackProblem def __init__( self, problem: BinPackProblem, params_objective_function: ParamsObjectiveFunction | None = None, **kwargs, ) -> None: super().__init__(problem, params_objective_function, **kwargs) self.variables = {}
[docs] def init_model(self, **kwargs: Any) -> None: self.cp_model = cp.Model() upper_bound = min( kwargs.get("upper_bound", self.problem.nb_items), self.problem.nb_items ) intervals = {} for t in self.problem.tasks_list: intervals[t] = self.cp_model.interval_var( start=(0, upper_bound - 1), end=(1, upper_bound), length=1, name=f"interval_item_{t}", ) if self.problem.has_constraint: for i, j in self.problem.incompatible_items: self.cp_model.enforce( self.cp_model.start(intervals[i]) != self.cp_model.start(intervals[j]) ) capacity = self.problem.capacity_bin self.cp_model.enforce( self.cp_model.sum( [ self.cp_model.pulse(intervals[t], self.problem.list_items[t].weight) for t in intervals ] ) <= capacity ) self.variables["intervals"] = intervals self.cp_model.minimize( self.cp_model.max([self.cp_model.end(intervals[t]) for t in intervals]) )
[docs] def get_task_unary_resource_is_present_variable( self, task: Task, unary_resource: UnaryResource ) -> "cp.BoolExpr": index = self.problem.get_index_from_unary_resource(unary_resource) return ( self.get_task_start_or_end_variable(task, start_or_end=StartOrEnd.START) == index )
[docs] def get_task_interval_variable(self, task: Task) -> "cp.IntervalVar": return self.variables["intervals"][task]
[docs] def retrieve_solution(self, result: cp.SolveResult) -> Solution: allocation = [ int(result.solution.get_start(self.get_task_interval_variable(t))) for t in self.problem.tasks_list ] return BinPackSolution(problem=self.problem, allocation=allocation)