Source code for discrete_optimization.fjsp.solvers.optal

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
from __future__ import annotations

from collections import defaultdict
from typing import Any, Optional

from discrete_optimization.fjsp.problem import FJobShopProblem, FJobShopSolution, Task
from discrete_optimization.generic_tasks_tools.solvers.optalcp_tasks_solver import (
    SchedulingOptalSolver,
)
from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Solution,
)

try:
    import optalcp as cp
except ImportError:
    cp = None
    optalcp_available = False
else:
    optalcp_available = True


[docs] class OptalFJspSolver(SchedulingOptalSolver[Task]): problem: FJobShopProblem def __init__( self, problem: FJobShopProblem, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs: Any, ): super().__init__(problem, params_objective_function, **kwargs) self.variables = {}
[docs] def init_model(self, **args: Any) -> None: self.cp_model = cp.Model() intervals = {} opt_intervals = {} intervals_per_machines = defaultdict(lambda: set()) for i in range(self.problem.n_jobs): for k in range(len(self.problem.list_jobs[i].sub_jobs)): subjob_option = self.problem.list_jobs[i].sub_jobs[k] durations = [opt.processing_time for opt in subjob_option] intervals[(i, k)] = self.cp_model.interval_var( start=(0, self.problem.horizon), end=(0, self.problem.horizon), length=(min(durations), max(durations)), optional=False, name=f"interval_{i}_{k}", ) for opt in subjob_option: opt_intervals[(i, k, opt.machine_id)] = self.cp_model.interval_var( start=(0, self.problem.horizon), end=(0, self.problem.horizon), length=opt.processing_time, optional=True, name=f"interval_{i}_{k}_{opt.machine_id}", ) intervals_per_machines[opt.machine_id].add((i, k, opt.machine_id)) self.cp_model.alternative( intervals[(i, k)], [opt_intervals[(i, k, opt.machine_id)] for opt in subjob_option], ) if k >= 1: self.cp_model.end_before_start( intervals[(i, k - 1)], intervals[(i, k)] ) for machine in intervals_per_machines: self.cp_model.no_overlap( [opt_intervals[x] for x in intervals_per_machines[machine]] ) self.variables["intervals"] = intervals self.variables["opt_intervals"] = opt_intervals self.cp_model.minimize( self.cp_model.max( [ self.cp_model.end(self.variables["intervals"][x]) for x in self.variables["intervals"] ] ) )
[docs] def get_task_interval_variable(self, task: Task) -> cp.IntervalVar: return self.variables["intervals"][task]
[docs] def retrieve_solution(self, result: cp.SolveResult) -> Solution: schedule = [] for i in range(self.problem.n_jobs): sched_i = [] for k in range(len(self.problem.list_jobs[i].sub_jobs)): for index_opt, opt in enumerate(self.problem.list_jobs[i].sub_jobs[k]): if result.solution.is_present( self.variables["opt_intervals"][(i, k, opt.machine_id)] ): st, end = result.solution.get_value( self.variables["opt_intervals"][(i, k, opt.machine_id)] ) sched_i.append((st, end, opt.machine_id, index_opt)) schedule.append(sched_i) return FJobShopSolution(problem=self.problem, schedule=schedule)