# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import math
from copy import deepcopy
from typing import Dict, List, Tuple
from discrete_optimization.generic_tasks_tools.allocation import (
AllocationProblem,
AllocationSolution,
UnaryResource,
)
from discrete_optimization.generic_tasks_tools.scheduling import (
SchedulingProblem,
SchedulingSolution,
)
from discrete_optimization.generic_tools.do_problem import (
EncodingRegister,
ModeOptim,
ObjectiveDoc,
ObjectiveHandling,
ObjectiveRegister,
Problem,
Solution,
TypeObjective,
)
from discrete_optimization.generic_tools.encoding_register import ListInteger
from discrete_optimization.generic_tools.graph_api import Graph
Task = int
Resource = int
[docs]
class SalbpSolution(SchedulingSolution[Task], AllocationSolution[Task, Resource]):
# We can see this as both a scheduling and allocation solution
problem: "SalbpProblem"
[docs]
def get_end_time(self, task: Task) -> int:
return self.allocation_to_station[self.problem.tasks_to_index[task]] + 1
[docs]
def get_start_time(self, task: Task) -> int:
return self.allocation_to_station[self.problem.tasks_to_index[task]]
[docs]
def is_allocated(self, task: Task, unary_resource: UnaryResource) -> bool:
return (
self.allocation_to_station[self.problem.tasks_to_index[task]]
== unary_resource
)
def __init__(self, problem: "SalbpProblem", allocation_to_station: list[int]):
super().__init__(problem)
self.task_alloc = []
self.allocation_to_station = allocation_to_station
self._nb_stations = len(set(self.allocation_to_station))
[docs]
def copy(self) -> "SalbpSolution":
return SalbpSolution(self.problem, deepcopy(self.allocation_to_station))
[docs]
def change_problem(self, new_problem: "Problem") -> "Solution":
return SalbpSolution(new_problem, deepcopy(self.allocation_to_station))
def __str__(self):
return f"SalbpSolution(nb_stations={self._nb_stations})"
def __hash__(self):
# Hash based on the sorted tuple of items to ensure determinism
return hash(tuple(self.allocation_to_station))
def __eq__(self, other):
return self.allocation_to_station == other.allocation_to_station
[docs]
class SalbpProblem(SchedulingProblem[Task], AllocationProblem[Task, Resource]):
[docs]
def get_makespan_upper_bound(self) -> int:
# Max number of station, can be better.
return self.number_of_tasks
@property
def unary_resources_list(self) -> list[UnaryResource]:
return list(range(self.number_of_tasks))
@property
def tasks_list(self) -> list[Task]:
return self.tasks
[docs]
def get_last_tasks(self) -> list[Task]:
return [t for t in self.tasks_list if len(self.adj[t]) == 0]
def __init__(
self,
number_of_tasks: int,
cycle_time: int,
task_times: Dict[int, int],
precedence: List[Tuple[int, int]],
):
self.number_of_tasks = number_of_tasks
self.cycle_time = cycle_time
self.task_times = task_times
self.precedence = precedence
# Build adjacency for easier graph traversal
self.adj = {i: [] for i in task_times.keys()}
self.predecessors = {i: [] for i in task_times.keys()}
for p, s in precedence:
if p in self.adj:
self.adj[p].append(s)
if s in self.predecessors:
self.predecessors[s].append(p)
# Standard ordering of task IDs (assuming 1 to N usually, but we use keys)
self.tasks = sorted(list(task_times.keys()))
self.tasks_to_index = {self.tasks[i]: i for i in range(len(self.tasks))}
[docs]
def evaluate(self, variable: SalbpSolution) -> Dict[str, float]:
"""
Evaluate the solution.
Objective: Minimize the number of stations.
Constraint Penalties: Over-cycle time, Precedence violations.
"""
stations_time_cumul = {}
penalty_overtime = 0
penalty_undertime = 0
penalty_precedence = 0
# 1. Group by station and check Cycle Time
for i in range(self.number_of_tasks):
station_task_i = variable.allocation_to_station[i]
if station_task_i not in stations_time_cumul:
stations_time_cumul[station_task_i] = 0
stations_time_cumul[station_task_i] += self.task_times[self.tasks_list[i]]
for s_id, total_time in stations_time_cumul.items():
penalty_overtime += max(0, total_time - self.cycle_time)
penalty_undertime += max(0, self.cycle_time - total_time)
# 2. Check Precedence: Pred station <= Succ station
for p, s in self.precedence:
station_p = variable.get_start_time(task=p)
station_s = variable.get_end_time(task=s)
# If tasks are missing from solution, that's a structural issue,
# usually assumed complete in a valid solution object.
if station_p is not None and station_s is not None:
if station_p > station_s:
penalty_precedence += 1
nb_stations = len(stations_time_cumul)
# Total cost logic (Soft constraints handling)
# We assume standard objective is just nb_stations,
# but for solvers we often add penalties.
return {
"nb_stations": nb_stations,
"penalty_overtime": penalty_overtime,
"penalty_undertime": penalty_undertime,
"penalty_precedence": penalty_precedence,
}
[docs]
def get_solution_type(self) -> type[Solution]:
return SalbpSolution
[docs]
def satisfy(self, variable: SalbpSolution) -> bool:
"""
Strict check for validity.
"""
eval_res = self.evaluate(variable)
return eval_res["penalty_overtime"] == 0 and eval_res["penalty_precedence"] == 0
[docs]
def get_attribute_register(self) -> EncodingRegister:
"""
Register for standard encoding (optional, but good for GA/CP solvers).
We define the solution as a list of integers (station assignments).
"""
# Note: Bounding the number of stations is tricky without a heuristic.
# A safe upper bound is the number of tasks (1 task per station).
encoding = dict()
encoding[f"allocation_to_station"] = ListInteger(
length=self.number_of_tasks, lows=0, ups=self.number_of_tasks - 1
)
return EncodingRegister(encoding)
[docs]
def get_objective_register(self) -> ObjectiveRegister:
"""
Defines the default objective to minimize.
"""
return ObjectiveRegister(
objective_sense=ModeOptim.MINIMIZATION,
objective_handling=ObjectiveHandling.AGGREGATE,
dict_objective_to_doc={
"nb_stations": ObjectiveDoc(
type=TypeObjective.OBJECTIVE, default_weight=1
),
"penalty_overtime": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=10000
),
"penalty_undertime":
# This is really soft, solver should handle it lexicographically.
ObjectiveDoc(type=TypeObjective.PENALTY, default_weight=0),
"penalty_precedence": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=10000
),
},
)
[docs]
def get_solution_type_member(self) -> SalbpSolution:
# don't satisfy the constraints ;)
return SalbpSolution(self, allocation_to_station=[0] * self.number_of_tasks)
[docs]
def get_graph_precedence(self) -> Graph:
nodes = [(t, {}) for t in self.tasks_list]
edges = [(t, succ, {}) for t in self.adj for succ in self.adj[t]]
graph = Graph(nodes, edges, undirected=False, compute_predecessors=True)
return graph
[docs]
def calculate_salbp_lower_bounds(problem: SalbpProblem) -> int:
"""
Calculates lower bounds for the SALBP-1 problem.
Inspired by: https://github.com/domain-independent-dp/didp-rs
"""
times = [problem.task_times[t] for t in problem.tasks]
c = problem.cycle_time
# Bound 1: Theoretical minimum based on total work
lb1 = math.ceil(sum(times) / c)
# Bound 2: Based on tasks > c/2
# Two tasks > c/2 cannot share a station.
# One task == c/2 needs half a station.
w2_1 = sum(1 for t in times if t > c / 2)
w2_2 = sum(1 for t in times if t == c / 2)
lb2 = w2_1 + math.ceil(w2_2 / 2)
# Bound 3: More complex weighting
# Tasks > 2c/3 count as 1
# Tasks == 2c/3 count as 2/3
# Tasks > c/3 count as 1/2
# Tasks == c/3 count as 1/3
w3 = 0
for t in times:
if t > c * 2 / 3:
w3 += 1.0
elif t == c * 2 / 3:
w3 += 0.666
elif t > c / 3:
w3 += 0.5
elif t == c / 3:
w3 += 0.333
lb3 = math.ceil(w3)
return max(lb1, lb2, lb3)
[docs]
class SalbpProblem_1_2(SalbpProblem):
# Generalisation of SALBP-(1,2) problem,
# where you can vary both cycle time and number of station
[docs]
@staticmethod
def from_salbp1(problem: SalbpProblem):
return SalbpProblem_1_2(
number_of_tasks=problem.number_of_tasks,
task_times=problem.task_times,
precedence=problem.precedence,
)
def __init__(
self,
number_of_tasks: int,
task_times: Dict[int, int],
precedence: List[Tuple[int, int]],
):
super().__init__(
number_of_tasks,
cycle_time=None,
task_times=task_times,
precedence=precedence,
)
[docs]
def evaluate(self, variable: SalbpSolution) -> Dict[str, float]:
"""
Evaluate the solution.
Objective: Minimize the number of stations and cycle time
"""
stations_time_cumul = {}
penalty_precedence = 0
for i in range(self.number_of_tasks):
station_task_i = variable.allocation_to_station[i]
if station_task_i not in stations_time_cumul:
stations_time_cumul[station_task_i] = 0
stations_time_cumul[station_task_i] += self.task_times[self.tasks_list[i]]
for p, s in self.precedence:
station_p = variable.get_start_time(task=p)
station_s = variable.get_end_time(task=s)
if station_p is not None and station_s is not None:
if station_p > station_s:
penalty_precedence += 1
nb_stations = len(stations_time_cumul)
return {
"nb_stations": nb_stations,
"cycle_time": max(stations_time_cumul.values()),
"cycle_time_dispersion": max(stations_time_cumul.values())
- min(stations_time_cumul.values()),
"penalty_precedence": penalty_precedence,
}
[docs]
def satisfy(self, variable: SalbpSolution) -> bool:
"""
Strict check for validity.
"""
eval_res = self.evaluate(variable)
return eval_res["penalty_precedence"] == 0
[docs]
def get_objective_register(self) -> ObjectiveRegister:
"""
Defines the default objective to minimize.
"""
return ObjectiveRegister(
objective_sense=ModeOptim.MINIMIZATION,
objective_handling=ObjectiveHandling.AGGREGATE,
dict_objective_to_doc={
"nb_stations": ObjectiveDoc(
type=TypeObjective.OBJECTIVE, default_weight=1
),
"cycle_time": ObjectiveDoc(
type=TypeObjective.OBJECTIVE, default_weight=1
),
"cycle_time_dispersion":
# This is really soft, solver should handle it lexicographically.
ObjectiveDoc(type=TypeObjective.PENALTY, default_weight=0),
"penalty_precedence": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=10000
),
},
)