Source code for discrete_optimization.salbp.problem

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import math
from copy import deepcopy
from typing import Dict, List, Tuple

from discrete_optimization.generic_tasks_tools.allocation import (
    AllocationProblem,
    AllocationSolution,
    UnaryResource,
)
from discrete_optimization.generic_tasks_tools.scheduling import (
    SchedulingProblem,
    SchedulingSolution,
)
from discrete_optimization.generic_tools.do_problem import (
    EncodingRegister,
    ModeOptim,
    ObjectiveDoc,
    ObjectiveHandling,
    ObjectiveRegister,
    Problem,
    Solution,
    TypeObjective,
)
from discrete_optimization.generic_tools.encoding_register import ListInteger
from discrete_optimization.generic_tools.graph_api import Graph

Task = int
Resource = int


[docs] class SalbpSolution(SchedulingSolution[Task], AllocationSolution[Task, Resource]): # We can see this as both a scheduling and allocation solution problem: "SalbpProblem"
[docs] def get_end_time(self, task: Task) -> int: return self.allocation_to_station[self.problem.tasks_to_index[task]] + 1
[docs] def get_start_time(self, task: Task) -> int: return self.allocation_to_station[self.problem.tasks_to_index[task]]
[docs] def is_allocated(self, task: Task, unary_resource: UnaryResource) -> bool: return ( self.allocation_to_station[self.problem.tasks_to_index[task]] == unary_resource )
def __init__(self, problem: "SalbpProblem", allocation_to_station: list[int]): super().__init__(problem) self.task_alloc = [] self.allocation_to_station = allocation_to_station self._nb_stations = len(set(self.allocation_to_station))
[docs] def copy(self) -> "SalbpSolution": return SalbpSolution(self.problem, deepcopy(self.allocation_to_station))
[docs] def change_problem(self, new_problem: "Problem") -> "Solution": return SalbpSolution(new_problem, deepcopy(self.allocation_to_station))
def __str__(self): return f"SalbpSolution(nb_stations={self._nb_stations})" def __hash__(self): # Hash based on the sorted tuple of items to ensure determinism return hash(tuple(self.allocation_to_station)) def __eq__(self, other): return self.allocation_to_station == other.allocation_to_station
[docs] class SalbpProblem(SchedulingProblem[Task], AllocationProblem[Task, Resource]):
[docs] def get_makespan_upper_bound(self) -> int: # Max number of station, can be better. return self.number_of_tasks
@property def unary_resources_list(self) -> list[UnaryResource]: return list(range(self.number_of_tasks)) @property def tasks_list(self) -> list[Task]: return self.tasks
[docs] def get_last_tasks(self) -> list[Task]: return [t for t in self.tasks_list if len(self.adj[t]) == 0]
def __init__( self, number_of_tasks: int, cycle_time: int, task_times: Dict[int, int], precedence: List[Tuple[int, int]], ): self.number_of_tasks = number_of_tasks self.cycle_time = cycle_time self.task_times = task_times self.precedence = precedence # Build adjacency for easier graph traversal self.adj = {i: [] for i in task_times.keys()} self.predecessors = {i: [] for i in task_times.keys()} for p, s in precedence: if p in self.adj: self.adj[p].append(s) if s in self.predecessors: self.predecessors[s].append(p) # Standard ordering of task IDs (assuming 1 to N usually, but we use keys) self.tasks = sorted(list(task_times.keys())) self.tasks_to_index = {self.tasks[i]: i for i in range(len(self.tasks))}
[docs] def evaluate(self, variable: SalbpSolution) -> Dict[str, float]: """ Evaluate the solution. Objective: Minimize the number of stations. Constraint Penalties: Over-cycle time, Precedence violations. """ stations_time_cumul = {} penalty_overtime = 0 penalty_undertime = 0 penalty_precedence = 0 # 1. Group by station and check Cycle Time for i in range(self.number_of_tasks): station_task_i = variable.allocation_to_station[i] if station_task_i not in stations_time_cumul: stations_time_cumul[station_task_i] = 0 stations_time_cumul[station_task_i] += self.task_times[self.tasks_list[i]] for s_id, total_time in stations_time_cumul.items(): penalty_overtime += max(0, total_time - self.cycle_time) penalty_undertime += max(0, self.cycle_time - total_time) # 2. Check Precedence: Pred station <= Succ station for p, s in self.precedence: station_p = variable.get_start_time(task=p) station_s = variable.get_end_time(task=s) # If tasks are missing from solution, that's a structural issue, # usually assumed complete in a valid solution object. if station_p is not None and station_s is not None: if station_p > station_s: penalty_precedence += 1 nb_stations = len(stations_time_cumul) # Total cost logic (Soft constraints handling) # We assume standard objective is just nb_stations, # but for solvers we often add penalties. return { "nb_stations": nb_stations, "penalty_overtime": penalty_overtime, "penalty_undertime": penalty_undertime, "penalty_precedence": penalty_precedence, }
[docs] def get_solution_type(self) -> type[Solution]: return SalbpSolution
[docs] def satisfy(self, variable: SalbpSolution) -> bool: """ Strict check for validity. """ eval_res = self.evaluate(variable) return eval_res["penalty_overtime"] == 0 and eval_res["penalty_precedence"] == 0
[docs] def get_attribute_register(self) -> EncodingRegister: """ Register for standard encoding (optional, but good for GA/CP solvers). We define the solution as a list of integers (station assignments). """ # Note: Bounding the number of stations is tricky without a heuristic. # A safe upper bound is the number of tasks (1 task per station). encoding = dict() encoding[f"allocation_to_station"] = ListInteger( length=self.number_of_tasks, lows=0, ups=self.number_of_tasks - 1 ) return EncodingRegister(encoding)
[docs] def get_objective_register(self) -> ObjectiveRegister: """ Defines the default objective to minimize. """ return ObjectiveRegister( objective_sense=ModeOptim.MINIMIZATION, objective_handling=ObjectiveHandling.AGGREGATE, dict_objective_to_doc={ "nb_stations": ObjectiveDoc( type=TypeObjective.OBJECTIVE, default_weight=1 ), "penalty_overtime": ObjectiveDoc( type=TypeObjective.PENALTY, default_weight=10000 ), "penalty_undertime": # This is really soft, solver should handle it lexicographically. ObjectiveDoc(type=TypeObjective.PENALTY, default_weight=0), "penalty_precedence": ObjectiveDoc( type=TypeObjective.PENALTY, default_weight=10000 ), }, )
[docs] def get_solution_type_member(self) -> SalbpSolution: # don't satisfy the constraints ;) return SalbpSolution(self, allocation_to_station=[0] * self.number_of_tasks)
[docs] def get_graph_precedence(self) -> Graph: nodes = [(t, {}) for t in self.tasks_list] edges = [(t, succ, {}) for t in self.adj for succ in self.adj[t]] graph = Graph(nodes, edges, undirected=False, compute_predecessors=True) return graph
[docs] def calculate_salbp_lower_bounds(problem: SalbpProblem) -> int: """ Calculates lower bounds for the SALBP-1 problem. Inspired by: https://github.com/domain-independent-dp/didp-rs """ times = [problem.task_times[t] for t in problem.tasks] c = problem.cycle_time # Bound 1: Theoretical minimum based on total work lb1 = math.ceil(sum(times) / c) # Bound 2: Based on tasks > c/2 # Two tasks > c/2 cannot share a station. # One task == c/2 needs half a station. w2_1 = sum(1 for t in times if t > c / 2) w2_2 = sum(1 for t in times if t == c / 2) lb2 = w2_1 + math.ceil(w2_2 / 2) # Bound 3: More complex weighting # Tasks > 2c/3 count as 1 # Tasks == 2c/3 count as 2/3 # Tasks > c/3 count as 1/2 # Tasks == c/3 count as 1/3 w3 = 0 for t in times: if t > c * 2 / 3: w3 += 1.0 elif t == c * 2 / 3: w3 += 0.666 elif t > c / 3: w3 += 0.5 elif t == c / 3: w3 += 0.333 lb3 = math.ceil(w3) return max(lb1, lb2, lb3)
[docs] class SalbpProblem_1_2(SalbpProblem): # Generalisation of SALBP-(1,2) problem, # where you can vary both cycle time and number of station
[docs] @staticmethod def from_salbp1(problem: SalbpProblem): return SalbpProblem_1_2( number_of_tasks=problem.number_of_tasks, task_times=problem.task_times, precedence=problem.precedence, )
def __init__( self, number_of_tasks: int, task_times: Dict[int, int], precedence: List[Tuple[int, int]], ): super().__init__( number_of_tasks, cycle_time=None, task_times=task_times, precedence=precedence, )
[docs] def evaluate(self, variable: SalbpSolution) -> Dict[str, float]: """ Evaluate the solution. Objective: Minimize the number of stations and cycle time """ stations_time_cumul = {} penalty_precedence = 0 for i in range(self.number_of_tasks): station_task_i = variable.allocation_to_station[i] if station_task_i not in stations_time_cumul: stations_time_cumul[station_task_i] = 0 stations_time_cumul[station_task_i] += self.task_times[self.tasks_list[i]] for p, s in self.precedence: station_p = variable.get_start_time(task=p) station_s = variable.get_end_time(task=s) if station_p is not None and station_s is not None: if station_p > station_s: penalty_precedence += 1 nb_stations = len(stations_time_cumul) return { "nb_stations": nb_stations, "cycle_time": max(stations_time_cumul.values()), "cycle_time_dispersion": max(stations_time_cumul.values()) - min(stations_time_cumul.values()), "penalty_precedence": penalty_precedence, }
[docs] def satisfy(self, variable: SalbpSolution) -> bool: """ Strict check for validity. """ eval_res = self.evaluate(variable) return eval_res["penalty_precedence"] == 0
[docs] def get_objective_register(self) -> ObjectiveRegister: """ Defines the default objective to minimize. """ return ObjectiveRegister( objective_sense=ModeOptim.MINIMIZATION, objective_handling=ObjectiveHandling.AGGREGATE, dict_objective_to_doc={ "nb_stations": ObjectiveDoc( type=TypeObjective.OBJECTIVE, default_weight=1 ), "cycle_time": ObjectiveDoc( type=TypeObjective.OBJECTIVE, default_weight=1 ), "cycle_time_dispersion": # This is really soft, solver should handle it lexicographically. ObjectiveDoc(type=TypeObjective.PENALTY, default_weight=0), "penalty_precedence": ObjectiveDoc( type=TypeObjective.PENALTY, default_weight=10000 ), }, )