Source code for discrete_optimization.rcpsp.solvers.optal

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
from __future__ import annotations

from typing import Any, Optional

from discrete_optimization.generic_tasks_tools.solvers.optalcp_tasks_solver import (
    MultimodeOptalSolver,
    SchedulingOptalSolver,
)
from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Solution,
)
from discrete_optimization.generic_tools.do_solver import WarmstartMixin
from discrete_optimization.rcpsp.problem import RcpspProblem, RcpspSolution, Task
from discrete_optimization.rcpsp.solvers import RcpspSolver
from discrete_optimization.rcpsp.utils import create_fake_tasks

try:
    import optalcp as cp
except ImportError:
    cp = None
    optalcp_available = False
else:
    optalcp_available = True


[docs] class OptalRcpspSolver( SchedulingOptalSolver[Task], MultimodeOptalSolver[Task], RcpspSolver, WarmstartMixin ): problem: RcpspProblem def __init__( self, problem: RcpspProblem, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs, ) -> None: super().__init__(problem, params_objective_function, **kwargs) self.variables_dict = {}
[docs] def create_fake_tasks(self) -> list[dict[str, int]]: """ Create tasks representing the variable resource availability. :return: """ if self.problem.is_calendar: fake_task: list[dict[str, int]] = create_fake_tasks( rcpsp_problem=self.problem ) else: fake_task = [] return fake_task
[docs] def init_model(self, **args: Any) -> None: self.cp_model = cp.Model() self.create_interval_vars() self.create_precedence_constraints() self.create_cumulative_constraints() self.cp_model.minimize( self.cp_model.end(self.variables_dict["intervals"][self.problem.sink_task]) )
[docs] def create_interval_vars(self): intervals = {} opt_intervals = {} for t in self.problem.tasks_list: modes = self.problem.mode_details[t] modes_keys = list(modes.keys()) length = None if len(modes_keys) == 1: length = self.problem.mode_details[t][modes_keys[0]]["duration"] intervals[t] = self.cp_model.interval_var( start=(0, self.problem.horizon), end=(0, self.problem.horizon), length=length, optional=False, name=f"itv_{t}", ) opt_intervals[t] = {} if len(modes_keys) == 1: opt_intervals[t][modes_keys[0]] = intervals[t] # Useless else: for m in modes_keys: length = self.problem.mode_details[t][m]["duration"] opt_intervals[t][m] = self.cp_model.interval_var( start=(0, self.problem.horizon), end=(0, self.problem.horizon), length=length, optional=True, name=f"itv_{t}_{m}", ) self.cp_model.alternative( intervals[t], [opt_intervals[t][m] for m in opt_intervals[t]] ) self.variables_dict["intervals"] = intervals self.variables_dict["opt_intervals"] = opt_intervals
[docs] def create_precedence_constraints(self): for t in self.problem.successors: for succ in self.problem.successors[t]: self.cp_model.end_before_start( self.variables_dict["intervals"][t], self.variables_dict["intervals"][succ], )
[docs] def create_cumulative_constraints(self): calendar_tasks = self.create_fake_tasks() for res in self.problem.resources_list: capa = self.problem.get_max_resource_capacity(res) if res not in self.problem.non_renewable_resources: # The calendar virtual intervals. list_pulse = [ self.cp_model.pulse( interval=self.cp_model.interval_var( start=x["start"], end=x["start"] + x["duration"], optional=False, ), height=x[res], ) for x in calendar_tasks if x.get(res, 0) > 0 ] for t in self.variables_dict["opt_intervals"]: for m in self.variables_dict["opt_intervals"][t]: conso = self.problem.mode_details[t][m].get(res, 0) if conso > 0: list_pulse.append( self.cp_model.pulse( interval=self.variables_dict["opt_intervals"][t][m], height=conso, ) ) self.cp_model.enforce(self.cp_model.sum(list_pulse) <= capa) else: list_conso = [] for t in self.variables_dict["opt_intervals"]: for m in self.variables_dict["opt_intervals"][t]: conso = self.problem.mode_details[t][m].get(res, 0) if conso > 0: list_conso.append( self.cp_model.presence( self.variables_dict["opt_intervals"][t][m] ) * conso ) self.cp_model.enforce(self.cp_model.sum(list_conso) <= capa)
[docs] def get_task_interval_variable(self, task: Task) -> cp.IntervalVar: return self.variables_dict["intervals"][task]
[docs] def get_task_mode_is_present_variable(self, task: Task, mode: int) -> cp.BoolExpr: return self.cp_model.presence(self.variables_dict["opt_intervals"][task][mode])
[docs] def set_warm_start(self, solution: RcpspSolution) -> None: solution_optal = cp.Solution() for task in self.problem.tasks_list: st = solution.get_start_time(task) end = solution.get_end_time(task) mode = solution.get_mode(task) solution_optal.set_value(self.get_task_interval_variable(task), st, end) modes = self.problem.get_task_modes(task) if len(modes) > 0: solution_optal.set_value( self.variables_dict["opt_intervals"][task][mode], st, end ) makespan = solution.get_max_end_time() solution_optal.set_objective(makespan) self.warm_start_solution = solution_optal self.use_warm_start = True
[docs] def retrieve_solution(self, result: cp.SolveResult) -> Solution: schedule = {} modes_dict = {} if result.solution is not None: for t in self.variables_dict["intervals"]: start = result.solution.get_start(self.variables_dict["intervals"][t]) end = result.solution.get_end(self.variables_dict["intervals"][t]) schedule[t] = {"start_time": start, "end_time": end} for m in self.variables_dict["opt_intervals"][t]: if result.solution.is_present( self.variables_dict["opt_intervals"][t][m] ): modes_dict[t] = m return RcpspSolution( problem=self.problem, rcpsp_schedule=schedule, rcpsp_modes=[modes_dict[t] for t in self.problem.tasks_list_non_dummy], ) return None