Source code for discrete_optimization.rcalbp_l.solvers.cpsat

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
from __future__ import annotations

import logging
from typing import Any, Iterable

from ortools.sat.python.cp_model import (
    CpSolverSolutionCallback,
    Domain,
    LinearExprT,
)

from discrete_optimization.generic_tasks_tools.enums import StartOrEnd
from discrete_optimization.generic_tasks_tools.solvers.cpsat import (
    AllocationCpSatSolver,
    SchedulingCpSatSolver,
)
from discrete_optimization.generic_tools.do_solver import WarmstartMixin
from discrete_optimization.rcalbp_l.problem import (
    RCALBPLProblem,
    RCALBPLSolution,
    Task,
    WorkStation,
)

logger = logging.getLogger(__name__)


[docs] class CpSatRCALBPLSolver( AllocationCpSatSolver[Task, WorkStation], SchedulingCpSatSolver[Task], WarmstartMixin, ): problem: RCALBPLProblem variables: dict @property def subset_tasks_of_interest(self) -> Iterable[Task]: """Subset of tasks of interest for the allocation...""" return [t for t in self.problem.tasks_list if t[1] == 0]
[docs] def get_task_start_or_end_variable( self, task: Task, start_or_end: StartOrEnd ) -> LinearExprT: if start_or_end == StartOrEnd.START: return self.variables["starts"][task] elif start_or_end == StartOrEnd.END: return self.variables["ends"][task]
[docs] def init_model( self, minimize_used_cycle_time: bool = False, add_heuristic_constraint: bool = True, **kwargs: Any, ) -> None: super().init_model(**kwargs) self.variables = {} # 1. Create variables self.create_main_unfolded_intervals() self.create_resource_dispatch() self.create_cycle_time_variables() # 2. Post constraints self.constraint_only_one_station_allocation() self.create_cumulative_resource_constraint() self.create_zone_blocking() self.create_precedence_constraints() # 3. Post objective self.objective_value(minimize_used_cycle_time) if add_heuristic_constraint: self.create_heuristic_target_reached_constraints(True)
[docs] def create_main_unfolded_intervals(self): starts = {} ends = {} durations = {} dict_main_intervals = {} dict_opt_intervals = {} allocations = {} max_horizon = self.problem.c_max for p in self.problem.periods: # Time bounds isolating this period relative to 0 p_lb_start = 0 p_ub_start = max_horizon for task in self.problem.tasks: possible_durations = [ self.problem.get_duration(task, p, w) for w in self.problem.stations ] starts[(task, p)] = self.cp_model.NewIntVar( lb=p_lb_start, ub=p_ub_start, name=f"start_{task}_{p}" ) ends[(task, p)] = self.cp_model.NewIntVar( lb=p_lb_start, ub=p_ub_start, name=f"end_{task}_{p}" ) durations[(task, p)] = self.cp_model.NewIntVarFromDomain( domain=Domain.FromValues(possible_durations), name=f"duration_{task}_{p}", ) dict_main_intervals[(task, p)] = self.cp_model.NewIntervalVar( start=starts[(task, p)], end=ends[(task, p)], size=durations[(task, p)], name=f"interval_{task}_{p}", ) for w in self.problem.stations: # alloc should be fixed. dur = self.problem.get_duration(task, p, w) allocations[(task, p, w)] = self.cp_model.NewBoolVar( name=f"allocation_{task}_{p}_{w}" ) opt_var = self.cp_model.NewOptionalFixedSizeIntervalVar( start=starts[(task, p)], size=dur, is_present=allocations[(task, p, w)], name=f"interval_{task}_{p}_{w}", ) dict_opt_intervals[(task, p, w)] = opt_var if p > self.problem.periods[0]: self.cp_model.Add( allocations[(task, p, w)] == allocations[(task, self.problem.periods[0], w)] ) self.cp_model.add(dur == durations[task, p]).only_enforce_if( allocations[(task, p, w)] ) self.cp_model.add_exactly_one( [allocations[(task, p, w)] for w in self.problem.stations] ) self.variables["starts"] = starts self.variables["ends"] = ends self.variables["durations"] = durations self.variables["main_intervals"] = dict_main_intervals self.variables["opt_intervals"] = dict_opt_intervals self.variables["allocations"] = allocations
[docs] def constraint_only_one_station_allocation(self): for task in self.problem.tasks: for w in self.problem.stations: allocated_to_station = [ self.variables["allocations"][task, p, w] for p in self.problem.periods ] for i in range(1, len(allocated_to_station)): self.cp_model.add( allocated_to_station[i] == allocated_to_station[i - 1] )
[docs] def create_resource_dispatch(self): resource = {} for r in self.problem.resources: capa = self.problem.capa_resources[r] for w in self.problem.stations: resource[(r, w)] = self.cp_model.NewIntVar( lb=0, ub=capa, name=f"capa_{r}_station_{w}" ) self.cp_model.add( sum([resource[r, w] for w in self.problem.stations]) <= capa ) self.variables["resource_dispatch"] = resource
[docs] def create_cumulative_resource_constraint(self): for r in self.problem.resources: capa = self.problem.capa_resources[r] for p in self.problem.periods: # 1. Global capacity bound across all workstations pulses_global = [] for task in self.problem.tasks: req = self.problem.cons_resources[r][task] if req > 0: pulses_global.append( (self.variables["main_intervals"][(task, p)], req) ) if pulses_global: self.cp_model.add_cumulative( intervals=[x[0] for x in pulses_global], demands=[x[1] for x in pulses_global], capacity=capa, ) # 2. Local capacity bound per workstation (dispatch limit) for w in self.problem.stations: pulses_local = [] for task in self.problem.tasks: req = self.problem.cons_resources[r][task] if req > 0: pulses_local.append( (self.variables["opt_intervals"][(task, p, w)], req) ) if pulses_local: self.cp_model.add_cumulative( [x[0] for x in pulses_local], [x[1] for x in pulses_local], self.variables["resource_dispatch"][(r, w)], ) # 3. Zone capacities (local to workstations) for z in self.problem.zones: capa = self.problem.capa_zones[z] for p in self.problem.periods: for w in self.problem.stations: pulses_zone = [] for task in self.problem.tasks: req = self.problem.cons_zones[z][task] if req > 0: pulses_zone.append( (self.variables["opt_intervals"][(task, p, w)], req) ) if pulses_zone: if capa == 1: self.cp_model.add_no_overlap([x[0] for x in pulses_zone]) else: self.cp_model.add_cumulative( [x[0] for x in pulses_zone], [x[1] for x in pulses_zone], capa, )
[docs] def create_zone_blocking(self): """Be careful on the fact that this constraint only is valid with capa==1 for now""" for z in self.problem.zones: tasks_blocking = [ t for t in self.problem.tasks if z in self.problem.neutr_zones[t] ] tasks_consuming = [ t for t in self.problem.tasks if self.problem.cons_zones[z][t] > 0 if t not in tasks_blocking ] if self.problem.capa_zones[z] == 1 and tasks_blocking: for p in self.problem.periods: for w in self.problem.stations: pulses = [ (self.variables["opt_intervals"][(t, p, w)], 1) for t in tasks_blocking ] pulses.extend( [ ( self.variables["opt_intervals"][(t, p, w)], len(tasks_blocking), ) for t in tasks_consuming ] ) if pulses: self.cp_model.add_cumulative( [x[0] for x in pulses], [x[1] for x in pulses], len(tasks_blocking), )
[docs] def create_precedence_constraints(self): self.variables["same_station"] = {} for t1, t2 in self.problem.precedences: wks_t1 = sum( [ w * self.variables["allocations"][(t1, self.problem.periods[0], w)] for w in self.problem.stations ] ) wks_t2 = sum( [ w * self.variables["allocations"][(t2, self.problem.periods[0], w)] for w in self.problem.stations ] ) self.cp_model.add(wks_t1 <= wks_t2) same_station = self.cp_model.NewBoolVar(name=f"same_station_{t1}_{t2}") self.variables["same_station"][(t1, t2)] = same_station self.cp_model.add(wks_t1 == wks_t2).only_enforce_if(same_station) self.cp_model.add(wks_t2 != wks_t1).only_enforce_if(~same_station) for p in self.problem.periods: self.cp_model.add( self.variables["ends"][t1, p] <= self.variables["starts"][t2, p] ).only_enforce_if(same_station)
[docs] def create_cycle_time_variables(self): cycle_time_used = {} cycle_time_chosen = {} self.variables["is_decreasing"] = {} self.variables["is_lower_than_ctarget"] = {} max_horizon = self.problem.c_max for p in self.problem.periods: cycle_time_chosen[p] = self.cp_model.new_int_var( lb=self.problem.c_target, ub=max_horizon, name=f"cycle_time_chosen_{p}" ) cycle_time_used[p] = self.cp_model.new_int_var( lb=0, ub=max_horizon, name=f"cycle_time_used_{p}" ) for task in self.problem.tasks: self.cp_model.add(cycle_time_used[p] >= self.variables["ends"][task, p]) self.cp_model.add(cycle_time_chosen[p] >= cycle_time_used[p]) self.variables["is_lower_than_ctarget"][p] = self.cp_model.new_bool_var( f"is_lower_than_ctarget_{p}" ) self.cp_model.add( cycle_time_used[p] <= self.problem.c_target ).only_enforce_if(self.variables["is_lower_than_ctarget"][p]) self.cp_model.add( cycle_time_used[p] > self.problem.c_target ).only_enforce_if(~self.variables["is_lower_than_ctarget"][p]) if p >= self.problem.nb_stations: self.cp_model.add( cycle_time_chosen[p] == self.problem.c_target ).only_enforce_if(self.variables["is_lower_than_ctarget"][p]) self.variables["cycle_time_used"] = cycle_time_used self.variables["cycle_time_chosen"] = cycle_time_chosen unstable_periods = [ p for p in self.problem.periods if p < self.problem.nb_stations ] if len(unstable_periods) > 0: self.cp_model.add_max_equality( self.variables["cycle_time_chosen"][unstable_periods[0]], [self.variables["cycle_time_used"][uns] for uns in unstable_periods], ) # Stability. for i in range(1, len(unstable_periods)): p = unstable_periods[i] prev_p = unstable_periods[i - 1] self.cp_model.add( self.variables["cycle_time_chosen"][p] == self.variables["cycle_time_chosen"][prev_p] ) # Stable periods logic: Cycle time is monotonically decreasing stable_periods = [ p for p in self.problem.periods if p >= self.problem.nb_stations ] for p in stable_periods: prev_p = p - 1 if ( prev_p in self.problem.periods ): # Only link if previous period is also in the modeled window! self.cp_model.add( self.variables["cycle_time_chosen"][p] <= self.variables["cycle_time_chosen"][prev_p] ) self.cp_model.add( self.variables["cycle_time_used"][p] <= self.variables["cycle_time_used"][prev_p] ) is_decreasing = self.cp_model.NewBoolVar(name=f"is_decreasing_{p}") self.variables["is_decreasing"][p] = is_decreasing self.cp_model.add( self.variables["cycle_time_chosen"][p] < self.variables["cycle_time_chosen"][prev_p] ).only_enforce_if(is_decreasing) self.cp_model.add( self.variables["cycle_time_chosen"][p] >= self.variables["cycle_time_chosen"][prev_p] ).only_enforce_if(~is_decreasing) self.cp_model.add( self.variables["cycle_time_chosen"][p] == self.variables["cycle_time_used"][p] ).only_enforce_if(is_decreasing)
[docs] def fix_allocations(self, wks: dict): """ Hard-fixes the allocation variables to a known configuration. This transforms the complex ALBP problem into a scheduling problem. """ if self.cp_model is None: return for t, w_assigned in wks.items(): for p in self.problem.periods: for w in self.problem.stations: val = 1 if w == w_assigned else 0 self.cp_model.add(self.variables["allocations"][(t, p, w)] == val)
[docs] def objective_value(self, minimize_used_cycle_time: bool = False): if minimize_used_cycle_time: # ALTERNATIVE OBJECTIVE: Squeeze the layout as tight as mathematically possible. # Heavily weight 'cycle_time_used' to minimize the actual maximum end time. # Lightly weight 'cycle_time_chosen' to prevent floating values. obj_terms = [] for p in self.problem.periods: obj_terms.append(self.variables["cycle_time_used"][p]) self.cp_model.minimize(sum(obj_terms)) else: # ORIGINAL OBJECTIVE: Minimizing Ramp-up cost obj_terms = [] self.variables["cost"] = {} self.variables["is_upper_than_ctarget"] = {} for p in self.problem.periods: if p < self.problem.nb_stations: obj_terms.append(self.variables["cycle_time_chosen"][p]) else: cost = self.cp_model.NewIntVar( lb=0, ub=self.problem.c_max, name=f"cost_{p}" ) is_upper_than_ctarget = self.cp_model.NewBoolVar( name=f"is_upper_than_ctarget_{p}" ) self.variables["cost"][p] = cost self.variables["is_upper_than_ctarget"][p] = is_upper_than_ctarget self.cp_model.add( self.variables["cycle_time_chosen"][p] > self.problem.c_target ).only_enforce_if(is_upper_than_ctarget) self.cp_model.add( self.variables["cycle_time_chosen"][p] == self.problem.c_target ).only_enforce_if(~is_upper_than_ctarget) self.cp_model.add( cost == self.variables["cycle_time_chosen"][p] ).only_enforce_if(is_upper_than_ctarget) self.cp_model.add(cost == 0).only_enforce_if(~is_upper_than_ctarget) obj_terms.append(cost) self.cp_model.minimize(sum(obj_terms))
[docs] def create_heuristic_target_reached_constraints(self, apply_heuristic: bool = True): max_horizon = self.problem.c_max c_target = self.problem.c_target # 1. Constraint: If the "used" cycle time <= c_target, clamp the "chosen" cycle time to c_target. for p in self.problem.periods: if p in self.variables["is_upper_than_ctarget"]: self.cp_model.add( (self.variables["cycle_time_chosen"][p] == c_target) ).only_enforce_if(~self.variables["is_upper_than_ctarget"][p]) # 2. Heuristic: Freeze future schedules once target is reached if apply_heuristic: # We only apply this starting from the first stable period periods_after_setup = [ p for p in self.problem.periods if p >= self.problem.nb_stations ][:-1] for p in periods_after_setup: for t in self.problem.tasks: # Isolate the relative start time of task t for period p and p+1 start_p = self.variables["starts"][(t, p)] start_next = self.variables["starts"][(t, p + 1)] # Implication: If target is reached at period p, enforce that the relative # start time in period p+1 is strictly equal to the start time in period p. self.cp_model.add(start_p == start_next).only_enforce_if( ~self.variables["is_upper_than_ctarget"][p] )
[docs] def get_task_unary_resource_is_present_variable( self, task: Task, unary_resource: WorkStation ) -> "cp.BoolExpr": return self.variables["allocations"][(task[0], task[1], unary_resource)]
[docs] def retrieve_solution( self, cpsolvercb: CpSolverSolutionCallback ) -> RCALBPLSolution: """ Parses the CP-SAT result back into a native RCALBPLSolution. """ wks = {} raw = {} start = {} cyc = {} # Retrieve allocations for t in self.problem.tasks: for w in self.problem.stations: if cpsolvercb.Value( self.variables["allocations"][(t, self.problem.periods[0], w)] ): wks[t] = w break # Retrieve resource dispatch bounds for r in self.problem.resources: for w in self.problem.stations: raw[(r, w)] = cpsolvercb.Value( self.variables["resource_dispatch"][(r, w)] ) # Retrieve cycle times chosen for p in self.problem.periods: cyc[p] = cpsolvercb.Value(self.variables["cycle_time_chosen"][p]) # Retrieve relative task start times for p in self.problem.periods: for t in self.problem.tasks: start[(t, p)] = cpsolvercb.Value(self.variables["starts"][(t, p)]) return RCALBPLSolution( problem=self.problem, wks=wks, raw=raw, start=start, cyc=cyc )
[docs] def set_warm_start(self, solution: RCALBPLSolution) -> None: """ Injects an almost complete model state as a hint to the CP-SAT solver. """ if self.cp_model is None: return self.cp_model.clear_hints() for key in self.allocation_changes_variables: self.cp_model.add_hint(self.allocation_changes_variables[key], 0) # 1. Hint Allocations for t in self.problem.tasks: w_assigned = solution.wks.get(t, -1) for p in self.problem.periods: for w in self.problem.stations: if w == w_assigned: self.cp_model.AddHint( self.variables["allocations"][(t, p, w)], 1 ) else: self.cp_model.AddHint( self.variables["allocations"][(t, p, w)], 0 ) # 2. Hint Precedence booleans (same_station) if "same_station" in self.variables: for t1, t2 in self.problem.precedences: w1 = solution.wks.get(t1, -1) w2 = solution.wks.get(t2, -2) is_same = 1 if w1 == w2 else 0 self.cp_model.AddHint(self.variables["same_station"][(t1, t2)], is_same) # 3. Hint Resource Dispatch for r in self.problem.resources: for w in self.problem.stations: val = solution.raw.get((r, w), 0) self.cp_model.AddHint(self.variables["resource_dispatch"][(r, w)], val) # 4. Hint Cycle times, Task Starts, Ends, Durations, and Objective Flags for p in self.problem.periods: cyc_curr = solution.cyc.get(p, self.problem.c_max) self.cp_model.AddHint(self.variables["cycle_time_chosen"][p], cyc_curr) max_end = 0 for t in self.problem.tasks: if (t, p) in solution.start: st = solution.start[(t, p)] w = solution.wks.get(t, 0) dur = self.problem.get_duration(t, p, w) et = st + dur self.cp_model.AddHint(self.variables["starts"][(t, p)], st) self.cp_model.AddHint(self.variables["ends"][(t, p)], et) self.cp_model.AddHint(self.variables["durations"][(t, p)], dur) if et > max_end: max_end = et if p in self.variables.get("cycle_time_used", {}): self.cp_model.AddHint(self.variables["cycle_time_used"][p], max_end) # Hint for is_decreasing if p >= self.problem.nb_stations and "is_decreasing" in self.variables: prev_cyc = solution.cyc.get(p - 1, self.problem.c_max) is_dec = 1 if cyc_curr < prev_cyc else 0 if p in self.variables["is_decreasing"]: self.cp_model.AddHint(self.variables["is_decreasing"][p], is_dec) # Hint for cost and is_upper_than_ctarget if p >= self.problem.nb_stations: is_upper = 1 if cyc_curr > self.problem.c_target else 0 cost_val = cyc_curr if is_upper else 0 if "is_upper_than_ctarget" in self.variables: self.cp_model.AddHint( self.variables["is_upper_than_ctarget"][p], is_upper ) if "cost" in self.variables: self.cp_model.AddHint(self.variables["cost"][p], cost_val)
[docs] def fix_allocations_and_resources(self, wks: dict, raw: dict): """ Hard-fixes the allocation and resource variables to a known configuration. This transforms the complex ALBP problem into a purely independent scheduling problem. """ if getattr(self, "cp_model", None) is None: return # Lock Task Allocations for t, w_assigned in wks.items(): for p in self.problem.periods: for w in self.problem.stations: val = 1 if w == w_assigned else 0 self.cp_model.Add(self.variables["allocations"][(t, p, w)] == val) # Lock Resource Dispatch if "resource_dispatch" in self.variables: for (r, w), val in raw.items(): self.cp_model.Add(self.variables["resource_dispatch"][(r, w)] == val)
[docs] def add_cycle_time_lower_bound(self, p: int, lower_bound: int): """ Ensures cycle time monotonicity across independent chunk boundaries. """ if ( "cycle_time_chosen" in self.variables and p in self.variables["cycle_time_chosen"] ): self.cp_model.Add(self.variables["cycle_time_chosen"][p] >= lower_bound)