Source code for discrete_optimization.rcalbp_l.problem

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
# Implementation of the problem described in
# https://drops.dagstuhl.de/storage/00lipics/lipics-vol340-cp2025/LIPIcs.CP.2025.25/LIPIcs.CP.2025.25.pdf
from typing import Dict, List, Optional, Tuple

import numpy as np
from matplotlib import colors as mcolors
from matplotlib import pyplot as plt
from matplotlib.widgets import Slider

from discrete_optimization.generic_tasks_tools.allocation import (
    AllocationProblem,
    AllocationSolution,
    UnaryResource,
)
from discrete_optimization.generic_tasks_tools.scheduling import (
    SchedulingProblem,
    SchedulingSolution,
)
from discrete_optimization.generic_tools.do_problem import (
    ModeOptim,
    ObjectiveDoc,
    ObjectiveHandling,
    ObjectiveRegister,
    Problem,
    Solution,
    TypeObjective,
)

TaskUnit = int
Period = int
Task = Tuple[TaskUnit, Period]
ResourceCumulative = int
WorkStation = int


[docs] class RCALBPLSolution(AllocationSolution[Task, WorkStation], SchedulingSolution[Task]): """ Solution representation for the RC-ALBP/L problem. """ problem: "RCALBPLProblem" def __init__( self, problem: "RCALBPLProblem", wks: Dict[TaskUnit, WorkStation], raw: Dict[Tuple[ResourceCumulative, WorkStation], int], start: Dict[Tuple[TaskUnit, Period], int], cyc: Dict[Period, int], ramp_up_duration: Optional[float] = None, nb_adjustments: Optional[int] = None, ): super().__init__(problem=problem) # wks[t]: workstation assigned to task t self.wks = wks # raw[(r, w)]: amount of resource r allocated to workstation w self.raw = raw # start[(t, p)]: start time of task t in period p self.start = start # cyc[p]: cycle time for period p self.cyc = cyc # Evaluations self.ramp_up_duration = ramp_up_duration self.nb_adjustments = nb_adjustments
[docs] def is_allocated(self, task: Task, unary_resource: WorkStation) -> bool: return self.wks[task[0]] == unary_resource
[docs] def get_end_time(self, task: Task) -> int: return self.get_start_time(task) + self.problem.get_duration( task=task[0], p=task[1], w=self.wks[task[0]] )
[docs] def get_start_time(self, task: Task) -> int: if task in self.start: return self.start[task] else: return 0
[docs] def copy(self) -> "RCALBPLSolution": return RCALBPLSolution( problem=self.problem, wks=dict(self.wks), raw=dict(self.raw), start=dict(self.start), cyc=dict(self.cyc), ramp_up_duration=self.ramp_up_duration, nb_adjustments=self.nb_adjustments, )
[docs] def lazy_copy(self) -> "RCALBPLSolution": return RCALBPLSolution( problem=self.problem, wks=self.wks, raw=self.raw, start=self.start, cyc=self.cyc, ramp_up_duration=self.ramp_up_duration, nb_adjustments=self.nb_adjustments, )
[docs] def change_problem(self, new_problem: Problem) -> None: super().change_problem(new_problem=new_problem) self.ramp_up_duration = None self.nb_adjustments = None
[docs] class RCALBPLVectorSolution(RCALBPLSolution): def __init__( self, problem: "RCALBPLProblem", allocation_task: list[WorkStation], permutation_task: list[TaskUnit], resource: list[int], ): Solution.__init__(self, problem) self.problem = problem target_time = {permutation_task[i]: i for i in range(len(permutation_task))} wks = { self.problem.tasks[i]: allocation_task[i] for i in range(len(allocation_task)) } res = {} i = 0 for i_r in range(self.problem.nb_resources): for i_w in range(self.problem.nb_stations): res[(self.problem.resources[i_r], self.problem.stations[i_w])] = ( resource[i] ) i += 1 sol = self.problem.build_full_solution( wks=wks, raw=res, target_starts=target_time ) self.wks = wks self.start = sol.start self.cyc = sol.cyc self.raw = sol.raw
[docs] class RCALBPLProblem(SchedulingProblem[Task], AllocationProblem[Task, WorkStation]): """ Problem definition for Resource-Constrained Assembly Line Balancing with Learning Effect (RC-ALBP/L). """
[docs] def get_makespan_upper_bound(self) -> int: return self.c_max
@property def unary_resources_list(self) -> list[UnaryResource]: return self.stations @property def tasks_list(self) -> list[Task]: return [(t, p) for t in self.tasks for p in self.periods] def __init__( self, c_target: int, c_max: int, nb_stations: int, nb_periods: int, nb_tasks: int, precedences: List[Tuple[Task, Task]], durations: List[List[int]], nb_resources: int, capa_resources: List[int], cons_resources: List[List[int]], nb_zones: int, capa_zones: List[int], cons_zones: List[List[int]], neutr_zones: List[List[int]], p_start: int = 0, p_end: Optional[int] = None, ): self.c_target = c_target self.c_max = c_max self.nb_stations = nb_stations self.nb_periods = nb_periods self.nb_tasks = nb_tasks self.precedences = precedences self.durations = durations self.nb_resources = nb_resources self.capa_resources = capa_resources self.cons_resources = cons_resources self.nb_zones = nb_zones self.capa_zones = capa_zones self.cons_zones = cons_zones self.neutr_zones = neutr_zones # Derived properties self.tasks = list(range(self.nb_tasks)) self.stations = list(range(self.nb_stations)) self.periods = list(range(self.nb_periods)) self.resources = list(range(self.nb_resources)) self.zones = list(range(self.nb_zones)) # Period start/end of interest. self.p_start = p_start self.p_end = p_end if p_end is not None else self.nb_periods self.periods = list(range(self.p_start, self.p_end)) # Restricted to the window
[docs] def get_objective_register(self) -> ObjectiveRegister: # Multi-objective optimization: ramp-up duration and line adjustments dict_objective = { "ramp_up_duration": ObjectiveDoc( type=TypeObjective.OBJECTIVE, default_weight=1.0 ), "nb_adjustments": ObjectiveDoc( type=TypeObjective.OBJECTIVE, default_weight=1.0 ), # Penalty objectives for constraints violations "violation_precedence": ObjectiveDoc( type=TypeObjective.PENALTY, default_weight=100000.0 ), "violation_capacity": ObjectiveDoc( type=TypeObjective.PENALTY, default_weight=100000.0 ), "violation_cycle_time": ObjectiveDoc( type=TypeObjective.PENALTY, default_weight=100000.0 ), } return ObjectiveRegister( objective_sense=ModeOptim.MINIMIZATION, objective_handling=ObjectiveHandling.AGGREGATE, dict_objective_to_doc=dict_objective, )
[docs] def get_duration(self, task: TaskUnit, p: Period, w: WorkStation) -> int: n = max(-1, p - w) # durations array has 0 at index 0 (n=-1), so n+1 aligns perfectly if n < 0: return 0 return self.durations[task][n]
[docs] def evaluate(self, variable: RCALBPLSolution) -> Dict[str, float]: ramp_up_duration = 0 nb_adjustments = 0 violation_precedence = 0.0 violation_capacity = 0.0 violation_cycle_time = 0.0 # 1. Ramp-up criteria computation costs = {} for p in self.periods: if p < self.nb_stations: costs[p] = variable.cyc[p] # Unstable period else: costs[p] = ( variable.cyc[p] if variable.cyc[p] > self.c_target else 0 ) # Stable period ramp_up_duration = sum(costs[p] for p in self.periods) for p in self.periods: if p >= self.nb_stations and (p - 1) in costs: if costs[p - 1] != costs[p]: nb_adjustments += 1 # 2. Evaluate cycle time validity for p in self.periods: cyc_p = variable.cyc[p] if cyc_p < self.c_target or cyc_p > self.c_max: print(cyc_p, self.c_max, self.c_target) print("lower than c-target or upper than cmax") violation_cycle_time += 1 # Unstable periods cannot change their cycle time if self.nb_stations > p >= 1: if p - 1 in variable.cyc: if variable.cyc[p] != variable.cyc[p - 1]: print("didn't respect the frozen") violation_cycle_time += 1 # Check if cycle time is respected by all tasks scheduled in p for t in self.tasks: w = variable.wks[t] dur = self.get_duration(t, p, w) end_t = variable.start[(t, p)] + dur if end_t > cyc_p: print("Exceeded the cycle at some point") violation_cycle_time += end_t - cyc_p # 3. Evaluate Precedence Constraints for a, b in self.precedences: wa = variable.wks[a] wb = variable.wks[b] if wa > wb: print("Violation station") violation_precedence += wa - wb # If in the same workstation, check temporal precedence if wa == wb: for p in self.periods: dur_a = self.get_duration(a, p, wa) end_a = variable.start[(a, p)] + dur_a if end_a > variable.start[(b, p)]: print("Violation precedence, ", end_a, variable.start[(b, p)]) violation_precedence += end_a - variable.start[(b, p)] # 4. Evaluate Global & Cumulative Resource / Zone Capacities for r in self.resources: total_r_allocated = sum(variable.raw.get((r, w), 0) for w in self.stations) if total_r_allocated > self.capa_resources[r]: # 0-indexed capacity arrays violation_capacity += total_r_allocated - self.capa_resources[r] print("too much capa ?") # Sweep-line algorithm for cumulative checking per period and workstation for p in self.periods: for w in self.stations: events = [] tasks_in_w = [t for t in self.tasks if variable.wks[t] == w] if not tasks_in_w: continue for t in tasks_in_w: s_time = variable.start[(t, p)] dur = self.get_duration(t, p, w) if dur > 0: events.append((s_time, "start", t)) events.append((s_time + dur, "end", t)) events.sort(key=lambda x: (x[0], 0 if x[1] == "end" else 1)) active_tasks = set() for time, ev_type, t in events: if ev_type == "start": active_tasks.add(t) # 4a. Check Cumulative Resources for r in self.resources: usage = sum( self.cons_resources[r][tsk] for tsk in active_tasks ) allocated = variable.raw.get((r, w), 0) if usage > allocated: violation_capacity += usage - allocated # 4b. Check Cumulative Zones for z in self.zones: z_usage = sum( self.cons_zones[z][tsk] for tsk in active_tasks ) if z_usage > self.capa_zones[z]: violation_capacity += z_usage - self.capa_zones[z - 1] # 4c. Check disabled zones (conflict check) [cite: 285, 291] disabled_zones = set() for tsk in active_tasks: for dz in self.neutr_zones[tsk]: disabled_zones.add(dz) for tsk in active_tasks: for z in self.zones: if self.cons_zones[z][tsk] > 0 and z in disabled_zones: violation_capacity += 1 else: active_tasks.remove(t) variable.ramp_up_duration = ramp_up_duration variable.nb_adjustments = nb_adjustments return { "ramp_up_duration": ramp_up_duration, "nb_adjustments": nb_adjustments, "violation_precedence": violation_precedence, "violation_capacity": violation_capacity, "violation_cycle_time": violation_cycle_time, }
[docs] def compute_actual_cycle_time_per_period( self, solution: RCALBPLSolution ) -> dict[Period, int]: dict_cycle_time_per_period = {} for p in self.periods: task_this_period = [t for t in solution.start if t[1] == p] if len(task_this_period) > 0: dict_cycle_time_per_period[p] = max( [solution.get_end_time(t) for t in task_this_period] ) else: dict_cycle_time_per_period[p] = None return dict_cycle_time_per_period
[docs] def satisfy(self, variable: RCALBPLSolution) -> bool: evals = self.evaluate(variable) return ( evals["violation_precedence"] == 0 and evals["violation_capacity"] == 0 and evals["violation_cycle_time"] == 0 )
[docs] def get_dummy_solution(self) -> RCALBPLSolution: """ Creates a trivial dummy solution (likely invalid). Assigns all tasks sequentially to the first workstation. """ wks = {t: 1 for t in self.tasks} raw = { (r, w): self.capa_resources[r] if w == 1 else 0 for r in self.resources for w in self.stations } cyc = {p: self.c_max for p in self.periods} start = {} for p in self.periods: current_time = 0 for t in self.tasks: start[(t, p)] = current_time n = max(-1, p - wks[t]) dur = self.durations[t][n + 1] current_time += dur return RCALBPLSolution(self, wks, raw, start, cyc)
[docs] def get_solution_type(self) -> type[Solution]: return RCALBPLSolution
[docs] def build_sgs_schedule_for_period_slow( self, wks: Dict[int, int], raw: Dict[Tuple[int, int], int], target_starts: Dict[int, int], period: int, ) -> Tuple[Dict[int, int], int]: """ Robust Serial Generation Scheme (SGS) to compute a feasible schedule. Uses a dynamic eligible set to strictly guarantee Precedence constraints, and uses 'target_starts' (from an optimal future period) to guide the packing. """ start_times = {} end_times = {} scheduled_tasks = [] unscheduled = set(self.tasks) while unscheduled: # 1. Identify eligible tasks (all same-station predecessors are already scheduled) eligible = [] for t in unscheduled: is_eligible = True for pred, succ in self.precedences: if succ == t and wks[pred] == wks[t] and pred not in start_times: is_eligible = False break if is_eligible: eligible.append(t) # 2. Pick the best eligible task guided by the optimal future schedule # Primary: Earliest target start time # Secondary: Longest Processing Time (LPT) to pack big tasks first # Tertiary: Task ID for stable tie-breaking t = min( eligible, key=lambda x: ( target_starts.get(x, 0), -self.get_duration(x, period, wks[x]), x, ), ) unscheduled.remove(t) w = wks[t] dur = self.get_duration(t, period, w) # 3. Earliest Start Time (EST) based on Precedences est = 0 for pred, succ in self.precedences: if succ == t and wks[pred] == w: est = max(est, end_times[pred]) # 4. Collect candidate times candidates = {est} for ot in scheduled_tasks: if end_times[ot] >= est: candidates.add(end_times[ot]) candidates = sorted(list(candidates)) best_start = est for c in candidates: if dur == 0: best_start = c break overlaps = [ ot for ot in scheduled_tasks if start_times[ot] < c + dur and end_times[ot] > c ] if not overlaps: best_start = c break events = {c, c + dur} for ot in overlaps: events.add(max(c, start_times[ot])) events.add(min(c + dur, end_times[ot])) events = sorted(list(events)) is_valid = True for i in range(len(events) - 1): t_eval = events[i] active = [ ot for ot in overlaps if start_times[ot] <= t_eval < end_times[ot] ] # A. Global Resources for r in self.resources: if self.cons_resources[r][t] > 0: usage = ( sum(self.cons_resources[r][ot] for ot in active) + self.cons_resources[r][t] ) if usage > self.capa_resources[r]: is_valid = False break if not is_valid: break # B. Local Resources active_w = [ot for ot in active if wks[ot] == w] for r in self.resources: if self.cons_resources[r][t] > 0: usage = ( sum(self.cons_resources[r][ot] for ot in active_w) + self.cons_resources[r][t] ) if usage > raw.get((r, w), 0): is_valid = False break if not is_valid: break # C. Zones and Zone Blocking disabled_zones_by_active = { dz for ot in active_w for dz in self.neutr_zones[ot] } disabled_zones_by_t = set(self.neutr_zones[t]) for z in self.zones: req_t = self.cons_zones[z][t] req_active = sum(self.cons_zones[z][ot] for ot in active_w) if req_t + req_active > self.capa_zones[z]: is_valid = False break # If t disables z, it cannot overlap with any task that CONSUMES z if z in disabled_zones_by_t and req_active > 0: is_valid = False break # If t consumes z, it cannot overlap with any task that DISABLES z if req_t > 0 and z in disabled_zones_by_active: is_valid = False break if not is_valid: break if is_valid: best_start = c break start_times[t] = best_start end_times[t] = best_start + dur scheduled_tasks.append(t) cycle_time = max(end_times.values()) if end_times else 0 return start_times, cycle_time
[docs] def build_sgs_schedule_for_period( self, wks: Dict[int, int], raw: Dict[Tuple[int, int], int], target_starts: Dict[int, int], period: int, ) -> Tuple[Dict[int, int], int]: """ Highly Optimized Serial Generation Scheme (SGS). Uses 1D timeline arrays and slice mathematics to evaluate capacities in a fraction of a millisecond per task. """ start_times = {} end_times = {} # --- 1. Precompute Task Data (Avoids dict lookups in tight loops) --- t_res = { t: [(r, self.cons_resources[r][t]) for r in self.resources] for t in self.tasks } t_res_active = { t: [(r, req) for r, req in t_res[t] if req > 0] for t in self.tasks } t_zones = { t: [(z, self.cons_zones[z][t]) for z in self.zones] for t in self.tasks } t_zones_active = { t: [(z, req) for z, req in t_zones[t] if req > 0] for t in self.tasks } t_disabled_zones = {t: self.neutr_zones[t] for t in self.tasks} # --- 2. Initialize Dependency Graph for O(1) Eligibility --- local_in_degree = {t: 0 for t in self.tasks} local_successors = {t: [] for t in self.tasks} est = {t: 0 for t in self.tasks} # Earliest Start Time tracking for pred, succ in self.precedences: if wks[pred] == wks[succ]: local_in_degree[succ] += 1 local_successors[pred].append(succ) eligible = [t for t in self.tasks if local_in_degree[t] == 0] # --- 3. Initialize High-Performance Timeline Arrays --- # Instead of sweeping intervals, we maintain the exact usage at each time unit. # We start with an array length of c_max. It will expand dynamically if needed. current_horizon = self.c_max + 1000 res_usage = [[0] * current_horizon for _ in self.resources] res_w_usage = { w: [[0] * current_horizon for _ in self.resources] for w in self.stations } zone_usage = { w: [[0] * current_horizon for _ in self.zones] for w in self.stations } zone_disabled = { w: [[0] * current_horizon for _ in self.zones] for w in self.stations } end_events = {0} # Track interesting candidate start times # --- 4. Main Scheduling Loop --- for _ in range(self.nb_tasks): # Pick the best eligible task t = min( eligible, key=lambda x: ( target_starts.get(x, 0), -self.get_duration(x, period, wks[x]), # LPT tie-breaker x, ), ) eligible.remove(t) w = wks[t] dur = self.get_duration(t, period, w) # Find candidate times strictly >= Earliest Start Time candidates = sorted([c for c in end_events if c >= est[t]]) best_start = est[t] for c in candidates: if dur == 0: best_start = c break end = c + dur # Dynamically expand arrays if the schedule exceeds the expected horizon if end >= current_horizon: extend_len = max(1000, end - current_horizon + 1000) for r in self.resources: res_usage[r].extend([0] * extend_len) for w_idx in self.stations: res_w_usage[w_idx][r].extend([0] * extend_len) for z in self.zones: for w_idx in self.stations: zone_usage[w_idx][z].extend([0] * extend_len) zone_disabled[w_idx][z].extend([0] * extend_len) current_horizon += extend_len valid = True # A. Fast Global & Local Resource Check for r, req in t_res_active[t]: # The max() over a slice is evaluated in C and is lightning fast if max(res_usage[r][c:end]) + req > self.capa_resources[r]: valid = False break if max(res_w_usage[w][r][c:end]) + req > raw.get((r, w), 0): valid = False break if not valid: continue # B. Fast Zone Capacity & Zone Blocking Check for z, req in t_zones_active[t]: if max(zone_usage[w][z][c:end]) + req > self.capa_zones[z]: valid = False break # RULE 2: If consuming, cannot overlap with disabled if max(zone_disabled[w][z][c:end]) > 0: valid = False break if not valid: continue # RULE 1: If disabling, cannot overlap with any consumption for dz in t_disabled_zones[t]: if max(zone_usage[w][dz][c:end]) > 0: valid = False break if not valid: continue # If we pass all checks, this time slot is mathematically perfect! best_start = c break # --- 5. Commit the Task --- start_times[t] = best_start end_t = best_start + dur end_times[t] = end_t end_events.add(end_t) # Update the timelines for the committed interval if dur > 0: for r, req in t_res_active[t]: for time_idx in range(best_start, end_t): res_usage[r][time_idx] += req res_w_usage[w][r][time_idx] += req for z, req in t_zones_active[t]: for time_idx in range(best_start, end_t): zone_usage[w][z][time_idx] += req for dz in t_disabled_zones[t]: for time_idx in range(best_start, end_t): zone_disabled[w][dz][time_idx] += 1 # --- 6. Update Dependencies --- for succ in local_successors[t]: local_in_degree[succ] -= 1 est[succ] = max(est[succ], end_t) if local_in_degree[succ] == 0: eligible.append(succ) cycle_time = max(end_times.values()) if end_times else 0 return start_times, cycle_time
[docs] def build_full_solution( self, wks: Dict[int, int], raw: Dict[Tuple[int, int], int], target_starts: Dict[int, int], ): start_times = {} cycle_time = {} for p in self.periods: st, cyc = self.build_sgs_schedule_for_period(wks, raw, target_starts, p) start_times.update({(t, p): st[t] for t in st}) cycle_time[p] = cyc return RCALBPLSolution( problem=self, wks=wks, raw=raw, start=start_times, cyc=cycle_time )
[docs] def plot_rcalbpl_dashboard(problem: RCALBPLProblem, solution: RCALBPLSolution): """ Creates an interactive matplotlib dashboard to visualize RC-ALBP/L solutions. - Top plot: Gantt chart of the assembly line for a selected period. - Bottom plot: Evolution of the Cycle Times (Target, Chosen, Real) across all periods. """ # 1. Prepare global data for the Cycle Time evolution chart periods = problem.periods chosen_cycs = [solution.cyc[p] for p in periods] real_cycs = [] for p in periods: max_end = 0 for t in problem.tasks: w = solution.wks[t] dur = problem.get_duration(t, p, w) if dur > 0: end_t = solution.start.get((t, p), 0) + dur if end_t > max_end: max_end = end_t real_cycs.append(max_end) target_cycs = [problem.c_target] * len(periods) # 2. Setup Figure and Grid fig, (ax_gantt, ax_line) = plt.subplots( 2, 1, figsize=(14, 10), gridspec_kw={"height_ratios": [2.5, 1]} ) plt.subplots_adjust(bottom=0.15, hspace=0.35) # 3. Create a stable color map for tasks cmap = plt.get_cmap("tab20") task_colors = {t: cmap(i % 20) for i, t in enumerate(problem.tasks)} # --- BOTTOM PLOT: Cycle Time Evolution --- ax_line.plot( periods, chosen_cycs, color="red", linestyle="-", label="Chosen Cycle Time", linewidth=2, ) ax_line.plot( periods, real_cycs, color="blue", linestyle=":", label="Real Cycle Time", linewidth=2, ) ax_line.plot( periods, target_cycs, color="green", linestyle="--", label="Target Cycle Time", linewidth=2, ) # Highlight the boundary between unstable (fill-up) and stable periods [cite: 170, 171] boundary = problem.nb_stations - 0.5 ax_line.axvline( boundary, color="grey", linestyle="-.", label="Stable Period Boundary" ) ax_line.text( boundary, max(chosen_cycs) * 0.9, " Ramp-up", color="grey", verticalalignment="top", ) # Marker for the currently selected period vline_current_period = ax_line.axvline( 0, color="orange", linewidth=4, alpha=0.5, label="Current Period" ) ax_line.set_xlabel("Periods") ax_line.set_ylabel("Time") ax_line.set_title("Cycle Time Evolution (Learning Curve Effect)") ax_line.legend(loc="upper right") ax_line.grid(True, linestyle="--", alpha=0.6) ax_line.set_xticks(periods) if len(periods) > 20: ax_line.set_xticks( periods[::5] ) # Clean up x-ticks if there are too many periods # --- TOP PLOT: Interactive Gantt Chart --- def draw_gantt(p: int): ax_gantt.clear() stations = problem.stations ax_gantt.set_yticks(stations) ax_gantt.set_yticklabels([f"WS {w}" for w in stations]) ax_gantt.set_ylim(min(stations) - 0.5, max(stations) + 0.5) # Compute maximum time limit to anchor the X-axis across all frames ax_gantt.set_xlim(0, max(max(chosen_cycs), max(real_cycs)) * 1.05) ax_gantt.set_xlabel("Time") ax_gantt.set_ylabel("Workstations") period_type = "Unstable (Fill-up)" if p < problem.nb_stations else "Stable" ax_gantt.set_title(f"Assembly Line Schedule | Period: {p} ({period_type})") # Plot tasks as horizontal bars for t in problem.tasks: w = solution.wks[t] start_t = solution.start.get((t, p), 0) dur_t = problem.get_duration(t, p, w) if dur_t > 0: # Task is active ax_gantt.barh( w, dur_t, left=start_t, color=task_colors[t], edgecolor="black", height=0.5, alpha=0.8, ) # Center text inside the bar text_color = ( "white" if np.mean(mcolors.to_rgb(task_colors[t])[:3]) < 0.5 else "black" ) ax_gantt.text( start_t + dur_t / 2, w, f"T{t}", ha="center", va="center", color=text_color, fontsize=9, fontweight="bold", ) # Plot vertical limit lines ax_gantt.axvline( problem.c_target, color="green", linestyle="--", linewidth=2, label=f"Target ({problem.c_target})", ) ax_gantt.axvline( solution.cyc[p], color="red", linestyle="-", linewidth=2, label=f"Chosen ({solution.cyc[p]})", ) ax_gantt.axvline( real_cycs[p], color="blue", linestyle=":", linewidth=2, label=f"Real ({real_cycs[p]})", ) ax_gantt.legend(loc="upper right") ax_gantt.grid(axis="x", linestyle="--", alpha=0.5) # Initial draw for period 0 draw_gantt(periods[0]) ax_slider = plt.axes([0.15, 0.04, 0.7, 0.03], facecolor="lightgray") slider = Slider( ax=ax_slider, label="Select Period", valmin=min(periods), valmax=max(periods), valinit=min(periods), valstep=1, ) # Update function called when slider moves def update(val): p = int(slider.val) draw_gantt(p) vline_current_period.set_xdata([p]) fig.canvas.draw_idle() slider.on_changed(update) # Return the slider object to prevent it from being garbage collected by Python plt.show() return fig, slider