Source code for discrete_optimization.rcalbp_l.solvers.optal

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
from __future__ import annotations

from typing import Any

from discrete_optimization.generic_tasks_tools.solvers.optalcp_tasks_solver import (
    AllocationOptalSolver,
    SchedulingOptalSolver,
)
from discrete_optimization.rcalbp_l.problem import (
    RCALBPLProblem,
    RCALBPLSolution,
    Task,
    WorkStation,
)

try:
    import optalcp as cp
except ImportError:
    pass


[docs] class OptalRCALBPLSolver( AllocationOptalSolver[Task, WorkStation], SchedulingOptalSolver[Task] ): problem: RCALBPLProblem variables: dict
[docs] def init_model(self, **kwargs: Any) -> None: super().init_model(**kwargs) self.cp_model = cp.Model(name="RCALBPL") self.variables = {} # 1. Create variables self.create_main_unfolded_intervals() self.create_resource_dispatch() self.create_cycle_time_variables() # 2. Post constraints self.constraint_only_one_station_allocation() self.create_cumulative_resource_constraint() self.create_zone_blocking() self.create_precedence_constraints() # self.create_heuristic_target_reached_constraints(apply_heuristic=False) # 3. Post objective self.objective_value()
[docs] def create_main_unfolded_intervals(self): dict_main_intervals = {} dict_opt_intervals = {} allocations = {} max_horizon = self.problem.c_max # Iterate over the periods list (not just integer) for p in self.problem.periods: # Unfold time ONLY by period. All workstations run in parallel within this window. p_lb_start = 0 p_ub_start = max_horizon for task in self.problem.tasks: possible_durations = [ self.problem.get_duration(task, p, w) for w in self.problem.stations ] min_dur = min(possible_durations) max_dur = max(possible_durations) dict_main_intervals[(task, p)] = self.cp_model.interval_var( start=(p_lb_start, p_ub_start), end=(p_lb_start, p_ub_start), length=(min_dur, max_dur), optional=False, name=f"{task}_{p}_interval_unfolded", ) for w in self.problem.stations: dur = self.problem.get_duration(task, p, w) # Opt intervals share the exact same time window allowing parallel workstation overlap opt_var = self.cp_model.interval_var( start=(p_lb_start, p_ub_start), end=(p_lb_start, p_ub_start), length=dur, optional=True, name=f"{task}_{p}_{w}_interval_unfolded", ) dict_opt_intervals[(task, p, w)] = opt_var # Populate allocations dictionary with presence variables allocations[(task, p, w)] = self.cp_model.presence(opt_var) if p >= 1: self.cp_model.enforce( allocations[(task, p, w)] == allocations[(task, 0, w)] ) # Link main interval with its workstation alternatives self.cp_model.alternative( dict_main_intervals[(task, p)], [dict_opt_intervals[(task, p, w)] for w in self.problem.stations], ) self.variables["main_intervals"] = dict_main_intervals self.variables["opt_intervals"] = dict_opt_intervals self.variables["allocations"] = allocations
[docs] def constraint_only_one_station_allocation(self): for task in self.problem.tasks: is_allocated = [] for w in self.problem.stations: allocated_to_station = self.cp_model.max( [ self.variables["allocations"][(task, p, w)] for p in self.problem.periods ] ) is_allocated.append(allocated_to_station) self.cp_model.enforce(self.cp_model.sum(is_allocated) == 1)
[docs] def create_resource_dispatch(self): resource = {} for r in self.problem.resources: capa = self.problem.capa_resources[r] for w in self.problem.stations: resource[(r, w)] = self.cp_model.int_var( min=0, max=capa, name=f"capa_{r}_station_{w}" ) self.cp_model.enforce( self.cp_model.sum([resource[r, w] for w in self.problem.stations]) <= capa ) self.variables["resource_dispatch"] = resource
[docs] def create_cumulative_resource_constraint(self): max_horizon = self.problem.c_max for r in self.problem.resources: capa = self.problem.capa_resources[r] for p in self.problem.periods: # Time bounds isolating this period p_lb = 0 p_ub = max_horizon # 1. Global capacity bound across all workstations pulses_global = [] for task in self.problem.tasks: req = self.problem.cons_resources[r][task] # Indexing is [r][task] if req > 0: pulses_global.append( self.cp_model.pulse( self.variables["main_intervals"][(task, p)], req ) ) if pulses_global: self.cp_model.enforce(self.cp_model.sum(pulses_global) <= capa) # 2. Local capacity bound per workstation (dispatch limit) for w in self.problem.stations: pulses_local = [] for task in self.problem.tasks: req = self.problem.cons_resources[r][task] if req > 0: pulses_local.append( self.cp_model.pulse( self.variables["opt_intervals"][(task, p, w)], req ) ) # Dummy interval trick: Consumes the unallocated portion of the resource dispatch unallocated = capa - self.variables["resource_dispatch"][(r, w)] dummy_interval = self.cp_model.interval_var( start=p_lb, end=p_ub, length=max_horizon, optional=False ) pulses_local.append( self.cp_model.pulse(dummy_interval, unallocated) ) self.cp_model.enforce(self.cp_model.sum(pulses_local) <= capa) # 3. Zone capacities (local to workstations) for z in self.problem.zones: capa = self.problem.capa_zones[z] for p in self.problem.periods: for w in self.problem.stations: # Zones are evaluated per workstation pulses_zone = [] for task in self.problem.tasks: req = self.problem.cons_zones[z][task] # Indexing [z][task] if req > 0: pulses_zone.append( self.cp_model.pulse( self.variables["opt_intervals"][(task, p, w)], req ) ) if pulses_zone: self.cp_model.enforce(self.cp_model.sum(pulses_zone) <= capa)
[docs] def create_zone_blocking(self): for z in self.problem.zones: tasks_blocking = [ t for t in self.problem.tasks if z in self.problem.neutr_zones[t] ] tasks_consuming = [ t for t in self.problem.tasks if self.problem.cons_zones[z][t] > 0 if t not in tasks_blocking ] # Since the zones have a capacity of 1 this sum threshold logic works perfectly if self.problem.capa_zones[z] == 1 and tasks_blocking: for p in self.problem.periods: for w in self.problem.stations: pulses = [ self.cp_model.pulse( self.variables["opt_intervals"][(t, p, w)], 1 ) for t in tasks_blocking ] pulses.extend( [ self.cp_model.pulse( self.variables["opt_intervals"][(t, p, w)], len(tasks_blocking), ) for t in tasks_consuming ] ) if pulses: self.cp_model.enforce( self.cp_model.sum(pulses) <= len(tasks_blocking) )
[docs] def create_precedence_constraints(self): for t1, t2 in self.problem.precedences: # 1. Global Station Precedence: wks(t1) <= wks(t2) wks_t1 = self.cp_model.sum( [ w * self.variables["allocations"][(t1, 0, w)] for w in self.problem.stations ] ) wks_t2 = self.cp_model.sum( [ w * self.variables["allocations"][(t2, 0, w)] for w in self.problem.stations ] ) self.cp_model.enforce(wks_t1 <= wks_t2) # 2. Temporal Precedence: Valid ONLY if they share the same workstation for p in self.problem.periods: for w in self.problem.stations: self.cp_model.end_before_start( self.variables["opt_intervals"][(t1, p, w)], self.variables["opt_intervals"][(t2, p, w)], )
[docs] def create_cycle_time_variables(self): cycle_time_used = {} cycle_time_chosen = {} max_horizon = self.problem.c_max for p in self.problem.periods: cycle_time_chosen[p] = self.cp_model.int_var( min=self.problem.c_target, max=max_horizon, name=f"cycle_time_chosen_{p}", ) cycle_time_used[p] = self.cp_model.int_var( min=0, max=max_horizon, name=f"cycle_time_used_{p}" ) # Baseline offset applied for this specific period's unfolded timeline max_ends = [] for task in self.problem.tasks: main_int = self.variables["main_intervals"][(task, p)] # Since all active tasks inside main_intervals perfectly overlap # within [p*max, (p+1)*max], we just subtract init_time to get the relative end. relative_end = self.cp_model.end(main_int) max_ends.append(relative_end) cycle_time_used[p] = self.cp_model.max(max_ends) # self.cp_model.enforce(cycle_time_used[p] == self.cp_model.max(max_ends)) self.cp_model.enforce(cycle_time_chosen[p] >= cycle_time_used[p]) self.variables["cycle_time_used"] = cycle_time_used self.variables["cycle_time_chosen"] = cycle_time_chosen # Unstable periods logic: Cycle time remains constant [cite: 176, 178, 439] if self.problem.nb_stations > 0: self.cp_model.enforce( self.variables["cycle_time_chosen"][0] == self.variables["cycle_time_used"][0] ) for p in range(1, self.problem.nb_stations): self.cp_model.enforce( self.variables["cycle_time_chosen"][p] == self.variables["cycle_time_chosen"][p - 1] ) self.cp_model.enforce( self.variables["cycle_time_used"][p] == self.variables["cycle_time_used"][p - 1] ) # Stable periods logic: Cycle time is monotonically decreasing [cite: 488, 499] for p in range(self.problem.nb_stations, self.problem.nb_periods): self.cp_model.enforce( self.variables["cycle_time_chosen"][p] <= self.variables["cycle_time_chosen"][p - 1] ) self.cp_model.enforce( self.variables["cycle_time_used"][p] <= self.variables["cycle_time_used"][p - 1] ) self.cp_model.enforce( self.cp_model.implies( self.variables["cycle_time_chosen"][p] < self.variables["cycle_time_chosen"][p - 1], self.variables["cycle_time_chosen"][p] == self.variables["cycle_time_used"][p], ) )
[docs] def objective_value(self): obj_terms = [] for p in self.problem.periods: if p < self.problem.nb_stations: obj_terms.append(self.variables["cycle_time_chosen"][p]) else: cost = self.cp_model.int_var( min=0, max=self.problem.c_max, name=f"cost_{p}" ) self.cp_model.enforce( self.cp_model.implies( ( self.variables["cycle_time_chosen"][p] > self.problem.c_target ), cost == self.variables["cycle_time_chosen"][p], ) ) self.cp_model.enforce( self.cp_model.implies( ( self.variables["cycle_time_chosen"][p] == self.problem.c_target ), cost == 0, ) ) # obj_terms.append(cost) obj_terms.append(self.variables["cycle_time_chosen"][p]) self.cp_model.minimize(self.cp_model.sum(obj_terms))
[docs] def create_heuristic_target_reached_constraints(self, apply_heuristic: bool = True): max_horizon = self.problem.c_max c_target = self.problem.c_target # 1. Constraint: If the "used" cycle time <= c_target, clamp the "chosen" cycle time to c_target. for p in self.problem.periods: # Link the boolean variable to the condition self.cp_model.enforce( (self.variables["cycle_time_used"][p] <= c_target) == (self.variables["cycle_time_chosen"][p] == c_target) ) # 2. Heuristic: Freeze future schedules once target is reached if apply_heuristic: # We only apply this starting from the first stable period for p in range(self.problem.nb_stations, self.problem.nb_periods - 1): for t in self.problem.tasks: # Isolate the relative start time of task t for period p and p+1 rel_start_p = self.cp_model.start( self.variables["main_intervals"][(t, p)] ) rel_start_next = self.cp_model.start( self.variables["main_intervals"][(t, p + 1)] ) # Implication: If target is reached at period p, enforce that the relative # start time in period p+1 is strictly equal to the start time in period p. self.cp_model.enforce( self.cp_model.implies( self.variables["cycle_time_chosen"][p] == c_target, (rel_start_p == rel_start_next), ) )
[docs] def get_task_unary_resource_is_present_variable( self, task: Task, unary_resource: WorkStation ) -> "cp.BoolExpr": return self.variables["allocations"][(task[0], task[1], unary_resource)]
[docs] def get_task_interval_variable(self, task: Task) -> "cp.IntervalVar": return self.variables["main_intervals"][task]
[docs] def retrieve_solution(self, result: "cp.SolveResult") -> RCALBPLSolution: """ Parses the OptalCP result back into a native RCALBPLSolution, translating absolute unfolded times back to relative workstation times. """ wks = {} raw = {} start = {} cyc = {} # Retrieve allocations for t in self.problem.tasks: for w in self.problem.stations: # Task allocation is constant, so we can just check period 0 if result.solution.is_present( self.variables["opt_intervals"][(t, 0, w)] ): wks[t] = w break # Retrieve resource dispatch bounds for r in self.problem.resources: for w in self.problem.stations: raw[(r, w)] = result.solution.get_value( self.variables["resource_dispatch"][(r, w)] ) # Retrieve cycle times chosen for p in self.problem.periods: cyc[p] = result.solution.get_value(self.variables["cycle_time_chosen"][p]) # Retrieve relative task start times for p in self.problem.periods: for t in self.problem.tasks: w = wks[t] # Fetch absolute start time and subtract baseline offset start[(t, p)] = result.solution.get_start( self.variables["opt_intervals"][(t, p, w)] ) return RCALBPLSolution( problem=self.problem, wks=wks, raw=raw, start=start, cyc=cyc )
[docs] class OptalRCALBPLSolverV2( AllocationOptalSolver[Task, WorkStation], SchedulingOptalSolver[Task] ): problem: RCALBPLProblem variables: dict
[docs] def init_model(self, **kwargs: Any) -> None: super().init_model(**kwargs) self.cp_model = cp.Model(name="RCALBPL") self.variables = {} # 1. Create variables self.create_main_unfolded_intervals() self.create_resource_dispatch() self.create_cycle_time_variables() # 2. Post constraints self.create_cumulative_resource_constraint() self.create_zone_blocking() self.create_precedence_constraints() # self.create_heuristic_target_reached_constraints(apply_heuristic=False) # 3. Post objective self.objective_value()
[docs] def create_main_unfolded_intervals(self): dict_main_intervals = {} dict_opt_intervals = {} allocations = {} max_horizon = self.problem.c_max # Iterate over the periods list (not just integer) for p in self.problem.periods: # Unfold time ONLY by period. All workstations run in parallel within this window. p_lb_start = max_horizon * (p * self.problem.nb_stations) p_ub_start = max_horizon * ((p + 1) * self.problem.nb_stations) for task in self.problem.tasks: possible_durations = [ self.problem.get_duration(task, p, w) for w in self.problem.stations ] min_dur = min(possible_durations) max_dur = max(possible_durations) dict_main_intervals[(task, p)] = self.cp_model.interval_var( start=(p_lb_start, p_ub_start), end=(p_lb_start, p_ub_start), length=(min_dur, max_dur), optional=False, name=f"{task}_{p}_interval_unfolded", ) if p == 0: for w in range(self.problem.nb_stations): allocations[(task, w)] = ( self.cp_model.start(dict_main_intervals[(task, p)]) >= self.problem.c_max * w ) & ( self.cp_model.end(dict_main_intervals[(task, p)]) <= self.problem.c_max * (w + 1) ) self.cp_model.enforce( self.cp_model.sum( [ allocations[(task, w)] for w in range(self.problem.nb_stations) ] ) == 1 ) w = self.cp_model.sum( [ w * allocations[(task, w)] for w in range(self.problem.nb_stations) ] ) self.cp_model.enforce( self.cp_model.implies( (p - w) >= 0, self.cp_model.length(dict_main_intervals[(task, p)]) == self.cp_model.element( self.problem.durations[task], p - w ), ) ) self.cp_model.enforce( self.cp_model.implies( (p - w) < 0, self.cp_model.length(dict_main_intervals[(task, p)]) == 0, ) ) else: w = self.cp_model.sum( [ w * allocations[(task, w)] for w in range(self.problem.nb_stations) ] ) self.cp_model.enforce( self.cp_model.implies( (p - w) >= 0, self.cp_model.length(dict_main_intervals[(task, p)]) == self.cp_model.element( self.problem.durations[task], p - w ), ) ) self.cp_model.enforce( self.cp_model.implies( (p - w) < 0, self.cp_model.length(dict_main_intervals[(task, p)]) == 0, ) ) self.cp_model.enforce( self.cp_model.start(dict_main_intervals[(task, p)]) >= (self.problem.nb_stations * p + w) * max_horizon ) self.cp_model.enforce( self.cp_model.end(dict_main_intervals[(task, p)]) <= (self.problem.nb_stations * p + w + 1) * max_horizon ) self.cp_model.enforce( self.cp_model.sum([allocations[task, 1] for task in self.problem.tasks]) >= 30 ) self.variables["main_intervals"] = dict_main_intervals self.variables["opt_intervals"] = dict_opt_intervals self.variables["allocations"] = allocations
[docs] def create_resource_dispatch(self): resource = {} for r in self.problem.resources: capa = self.problem.capa_resources[r] for w in self.problem.stations: resource[(r, w)] = self.cp_model.int_var( min=0, max=capa, name=f"capa_{r}_station_{w}" ) self.cp_model.enforce( self.cp_model.sum([resource[r, w] for w in self.problem.stations]) <= capa ) self.variables["resource_dispatch"] = resource
[docs] def create_cumulative_resource_constraint(self): max_horizon = self.problem.c_max for r in self.problem.resources: capa = self.problem.capa_resources[r] pulses_resources = [] pulses_global = [] for p in self.problem.periods: for w in self.problem.stations: lb = (p * self.problem.nb_stations + w) * max_horizon ub = (p * self.problem.nb_stations + w + 1) * max_horizon pulses_resources.append( self.cp_model.pulse( self.cp_model.interval_var(start=lb, end=ub), capa - self.variables["resource_dispatch"][r, w], ) ) # 1. Global capacity bound across all workstations for task in self.problem.tasks: req = self.problem.cons_resources[r][task] # Indexing is [r][task] if req > 0: pulses_global.append( self.cp_model.pulse( self.variables["main_intervals"][(task, p)], req ) ) if pulses_global: self.cp_model.enforce( self.cp_model.sum(pulses_global + pulses_resources) <= capa ) # 3. Zone capacities (local to workstations) for z in self.problem.zones: capa = self.problem.capa_zones[z] for p in self.problem.periods: pulses_zone = [] for task in self.problem.tasks: req = self.problem.cons_zones[z][task] # Indexing [z][task] if req > 0: pulses_zone.append( self.cp_model.pulse( self.variables["main_intervals"][(task, p)], req ) ) if pulses_zone: self.cp_model.enforce(self.cp_model.sum(pulses_zone) <= capa)
[docs] def create_zone_blocking(self): for z in self.problem.zones: tasks_blocking = [ t for t in self.problem.tasks if z in self.problem.neutr_zones[t] ] tasks_consuming = [ t for t in self.problem.tasks if self.problem.cons_zones[z][t] > 0 if t not in tasks_blocking ] # Since the zones have a capacity of 1 this sum threshold logic works perfectly if self.problem.capa_zones[z] == 1 and tasks_blocking: for p in self.problem.periods: pulses = [ self.cp_model.pulse(self.variables["main_intervals"][(t, p)], 1) for t in tasks_blocking ] pulses.extend( [ self.cp_model.pulse( self.variables["main_intervals"][(t, p)], len(tasks_blocking), ) for t in tasks_consuming ] ) if pulses: self.cp_model.enforce( self.cp_model.sum(pulses) <= len(tasks_blocking) )
[docs] def create_precedence_constraints(self): for t1, t2 in self.problem.precedences: # 2. Temporal Precedence: Valid ONLY if they share the same workstation for p in self.problem.periods: self.cp_model.end_before_start( self.variables["main_intervals"][(t1, p)], self.variables["main_intervals"][(t2, p)], )
[docs] def create_cycle_time_variables(self): cycle_time_used = {} cycle_time_chosen = {} max_horizon = self.problem.c_max for p in self.problem.periods: cycle_time_chosen[p] = self.cp_model.int_var( min=self.problem.c_target, max=max_horizon, name=f"cycle_time_chosen_{p}", ) cycle_time_used[p] = self.cp_model.int_var( min=0, max=max_horizon, name=f"cycle_time_used_{p}" ) # Baseline offset applied for this specific period's unfolded timeline max_ends = [] for task in self.problem.tasks: allocation = self.cp_model.sum( [ k * self.variables["allocations"][task, k] for k in range(self.problem.nb_stations) ] ) init_time = ( self.problem.nb_stations * p + allocation ) * self.problem.c_max main_int = self.variables["main_intervals"][(task, p)] relative_end = self.cp_model.end(main_int) - init_time max_ends.append(relative_end) cycle_time_used[p] = self.cp_model.max(max_ends) # self.cp_model.enforce(cycle_time_used[p] == self.cp_model.max(max_ends)) self.cp_model.enforce(cycle_time_chosen[p] >= cycle_time_used[p]) self.variables["cycle_time_used"] = cycle_time_used self.variables["cycle_time_chosen"] = cycle_time_chosen # Unstable periods logic: Cycle time remains constant [cite: 176, 178, 439] if self.problem.nb_stations > 0: self.cp_model.enforce( self.variables["cycle_time_chosen"][0] == self.variables["cycle_time_used"][0] ) for p in range(1, self.problem.nb_stations): self.cp_model.enforce( self.variables["cycle_time_chosen"][p] == self.variables["cycle_time_chosen"][p - 1] ) self.cp_model.enforce( self.variables["cycle_time_used"][p] == self.variables["cycle_time_used"][p - 1] ) # Stable periods logic: Cycle time is monotonically decreasing [cite: 488, 499] for p in range(self.problem.nb_stations, self.problem.nb_periods): self.cp_model.enforce( self.variables["cycle_time_chosen"][p] <= self.variables["cycle_time_chosen"][p - 1] ) self.cp_model.enforce( self.variables["cycle_time_used"][p] <= self.variables["cycle_time_used"][p - 1] ) self.cp_model.enforce( self.cp_model.implies( self.variables["cycle_time_chosen"][p] < self.variables["cycle_time_chosen"][p - 1], self.variables["cycle_time_chosen"][p] == self.variables["cycle_time_used"][p], ) )
[docs] def objective_value(self): obj_terms = [] for p in self.problem.periods: if p < self.problem.nb_stations: obj_terms.append(self.variables["cycle_time_chosen"][p]) else: cost = self.cp_model.int_var( min=0, max=self.problem.c_max, name=f"cost_{p}" ) self.cp_model.enforce( self.cp_model.implies( ( self.variables["cycle_time_chosen"][p] > self.problem.c_target ), cost == self.variables["cycle_time_chosen"][p], ) ) self.cp_model.enforce( self.cp_model.implies( ( self.variables["cycle_time_chosen"][p] == self.problem.c_target ), cost == 0, ) ) obj_terms.append(cost) self.cp_model.minimize(self.cp_model.sum(obj_terms))
[docs] def create_heuristic_target_reached_constraints(self, apply_heuristic: bool = True): max_horizon = self.problem.c_max c_target = self.problem.c_target # 1. Constraint: If the "used" cycle time <= c_target, clamp the "chosen" cycle time to c_target. for p in self.problem.periods: # Link the boolean variable to the condition self.cp_model.enforce( (self.variables["cycle_time_used"][p] <= c_target) == (self.variables["cycle_time_chosen"][p] == c_target) ) # 2. Heuristic: Freeze future schedules once target is reached if False and apply_heuristic: # We only apply this starting from the first stable period for p in range(self.problem.nb_stations, self.problem.nb_periods - 1): for t in self.problem.tasks: # Isolate the relative start time of task t for period p and p+1 rel_start_p = self.cp_model.start( self.variables["main_intervals"][(t, p)] ) - (p * max_horizon) rel_start_next = self.cp_model.start( self.variables["main_intervals"][(t, p + 1)] ) - ((p + 1) * max_horizon) # Implication: If target is reached at period p, enforce that the relative # start time in period p+1 is strictly equal to the start time in period p. self.cp_model.enforce( (self.variables["cycle_time_chosen"][p] == c_target) <= (rel_start_p == rel_start_next) )
[docs] def get_task_unary_resource_is_present_variable( self, task: Task, unary_resource: WorkStation ) -> "cp.BoolExpr": return self.variables["allocations"][(task[0], task[1], unary_resource)]
[docs] def get_task_interval_variable(self, task: Task) -> "cp.IntervalVar": return self.variables["main_intervals"][task]
[docs] def retrieve_solution(self, result: "cp.SolveResult") -> RCALBPLSolution: """ Parses the OptalCP result back into a native RCALBPLSolution, translating absolute unfolded times back to relative workstation times. """ wks = {} raw = {} start = {} cyc = {} # Retrieve allocations for t in self.problem.tasks: # Task allocation is constant, so we can just check period 0 start_ = result.solution.get_start(self.variables["main_intervals"][(t, 0)]) wks[t] = start_ // self.problem.c_max print(start_, self.problem.c_max) print(wks[t]) # Retrieve resource dispatch bounds for r in self.problem.resources: for w in self.problem.stations: raw[(r, w)] = result.solution.get_value( self.variables["resource_dispatch"][(r, w)] ) # Retrieve cycle times chosen for p in self.problem.periods: cyc[p] = result.solution.get_value(self.variables["cycle_time_chosen"][p]) # Retrieve relative task start times for p in self.problem.periods: # Baseline offset for period (reflecting the new unfolded timeline) for t in self.problem.tasks: # Fetch absolute start time and subtract baseline offset abs_start = result.solution.get_start( self.variables["main_intervals"][(t, p)] ) start[(t, p)] = abs_start % self.problem.c_max return RCALBPLSolution( problem=self.problem, wks=wks, raw=raw, start=start, cyc=cyc )