# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# Optal model for the preemptive rcpsp problem.
from __future__ import annotations
from typing import Any, Iterable, Optional
import numpy as np
from discrete_optimization.generic_tools.do_problem import (
ParamsObjectiveFunction,
)
from discrete_optimization.generic_tools.hub_solver.optal.optalcp_tools import (
OptalCpSolver,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
)
from discrete_optimization.rcpsp.problem_preemptive import (
PreemptiveRcpspProblem,
PreemptiveRcpspSolution,
)
from discrete_optimization.rcpsp.utils import create_fake_tasks
try:
import optalcp as cp
except ImportError:
cp = None
optalcp_available = False
else:
optalcp_available = True
[docs]
class OptalPreemptiveRcpspSolver(OptalCpSolver):
problem: PreemptiveRcpspProblem
def __init__(
self,
problem: PreemptiveRcpspProblem,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
**kwargs,
) -> None:
super().__init__(problem, params_objective_function, **kwargs)
self.variables = {}
self.last_sol_do = None
[docs]
def implements_lexico_api(self) -> bool:
return True
[docs]
def get_lexico_objective_value(self, obj: str, res: ResultStorage) -> float:
if obj == "makespan":
sol: PreemptiveRcpspSolution = res[-1][0]
return sol.get_max_end_time()
if obj == "nb_preemption":
sol: PreemptiveRcpspSolution = res[-1][0]
return sum(
[sol.get_number_of_part(task) for task in self.problem.tasks_list]
)
[docs]
def set_lexico_objective(self, obj: str) -> None:
self.cp_model.minimize(self.variables["objectives"][obj])
if self.warm_start_solution:
self.warm_start_solution.set_objective(self.last_sol_do._intern_obj[obj])
[docs]
def get_lexico_objectives_available(self) -> list[str]:
return list(self.variables["objectives"].keys())
[docs]
def add_lexico_constraint(self, obj: str, value: float) -> Iterable[Any]:
self.cp_model.enforce(self.variables["objectives"][obj] <= value)
[docs]
def init_model(self, **kwargs: Any) -> None:
self.cp_model = cp.Model()
self.create_preempt_variables(
max_nb_preemption=kwargs.get("max_nb_preemption", None)
)
self.constraint_convention_variables()
self.create_modes_variables()
self.create_resource_consumption_variables()
self.create_duration_variables()
self.constraint_variable_to_duration()
self.constraint_precedence()
self.constraint_resource()
nb_preemption = self.cp_model.sum(
[
self.cp_model.presence(self.variables["intervals"][t][i])
for t in self.variables["intervals"]
for i in range(len(self.variables["intervals"][t]))
]
)
self.variables["objectives"] = {
"makespan": self.cp_model.end(
self.variables["main_interval"][self.problem.sink_task]
),
"nb_preemption": nb_preemption,
}
self.cp_model.minimize(self.variables["objectives"]["makespan"])
[docs]
def create_modes_variables(self):
modes_dict = {}
for t in self.problem.tasks_list:
modes_dict[t] = {}
modes = list(self.problem.mode_details[t].keys())
nb_modes = len(self.problem.mode_details[t])
if nb_modes == 1:
modes_dict[t][modes[0]] = 1
else:
for m in modes:
modes_dict[t][m] = self.cp_model.bool_var(name=f"mode_{t}_{m}")
self.cp_model.enforce(
self.cp_model.sum([modes_dict[t][m] for m in modes]) == 1
)
self.variables["modes"] = modes_dict
[docs]
def create_resource_consumption_variables(self):
modes_var = self.variables["modes"]
resource_consumption_dict = {}
for t in self.problem.tasks_list:
resource_consumption_dict[t] = {}
modes = list(self.problem.mode_details[t].keys())
nb_modes = len(self.problem.mode_details[t])
if nb_modes == 1:
for r in self.problem.resources_list:
cons = self.problem.mode_details[t][modes[0]].get(r, 0)
if cons > 0:
resource_consumption_dict[t][r] = cons
else:
potential_resources = set(
[
r
for r in self.problem.resources_list
if any(
self.problem.mode_details[t][m].get(r, 0) > 0 for m in modes
)
]
)
for r in potential_resources:
values = [self.problem.mode_details[t][m].get(r, 0) for m in modes]
resource_consumption_dict[t][r] = self.cp_model.int_var(
min=min(values),
max=max(values),
name=f"resource_consumption_{t}_{r}",
)
for m in modes_var[t]:
for r in potential_resources:
cons = self.problem.mode_details[t][m].get(r, 0)
self.cp_model.enforce(
self.cp_model.implies(
modes_var[t][m], resource_consumption_dict[t][r] == cons
)
)
self.variables["resource_consumption"] = resource_consumption_dict
[docs]
def create_duration_variables(self):
modes_var = self.variables["modes"]
duration_dict = {}
for t in self.problem.tasks_list:
modes = list(self.problem.mode_details[t].keys())
nb_modes = len(self.problem.mode_details[t])
if nb_modes == 1:
duration_dict[t] = self.problem.mode_details[t][modes[0]]["duration"]
else:
potential_durations = list(
set([self.problem.mode_details[t][m]["duration"] for m in modes])
)
duration_dict[t] = self.cp_model.int_var(
min=min(potential_durations),
max=max(potential_durations),
name=f"duration_{t}",
)
for m in modes_var[t]:
dur = self.problem.mode_details[t][m]["duration"]
self.cp_model.enforce(
self.cp_model.implies(modes_var[t][m], duration_dict[t] == dur)
)
self.variables["duration"] = duration_dict
[docs]
def create_preempt_variables(self, max_nb_preemption: int | None = None) -> None:
presences = {}
intervals = {}
main_interval = {}
for t in self.problem.tasks_list:
possible_durations = [
self.problem.mode_details[t][m]["duration"]
for m in self.problem.mode_details[t]
]
max_duration = max(possible_durations)
if max_nb_preemption is None:
nb_preemption = max_duration + 1 # Naive
else:
nb_preemption = min(max_nb_preemption, max_duration + 1)
if max_duration == 0:
min_duration = 0
else:
min_duration = 1
intervals[t] = [
self.cp_model.interval_var(
start=(0, self.problem.horizon),
end=(0, self.problem.horizon),
length=(min_duration, max_duration),
optional=True,
name=f"interval_{t}_{i}",
)
for i in range(nb_preemption)
]
main_interval[t] = self.cp_model.interval_var(
start=(0, self.problem.horizon),
end=(0, self.problem.horizon),
length=(min(possible_durations), None),
optional=False,
name=f"interval_{t}",
)
self.cp_model.span(main_interval[t], intervals[t])
presences[t] = [
self.cp_model.presence(intervals[t][i]) for i in range(nb_preemption)
]
self.variables["presences"] = presences
self.variables["intervals"] = intervals
self.variables["main_interval"] = main_interval
[docs]
def constraint_convention_variables(self):
for t in self.variables["intervals"]:
nb_preemption = len(self.variables["presences"][t])
# self.cp_model.enforce(self.cp_model.presence(self.variables["intervals"][t][0]))
self.cp_model._itv_presence_chain(self.variables["intervals"][t])
modes = list(self.problem.mode_details[t].keys())
self.cp_model.no_overlap(self.variables["intervals"][t])
for i in range(nb_preemption - 1):
self.cp_model.end_before_start(
self.variables["intervals"][t][i],
self.variables["intervals"][t][i + 1],
)
self.cp_model.enforce(
self.cp_model.presence(self.variables["intervals"][t][i])
>= self.cp_model.presence(self.variables["intervals"][t][i + 1])
)
# self.cp_model.enforce(self.cp_model.implies(self.cp_model.presence(self.variables["intervals"][t][i+1]),
# self.cp_model.start(self.variables["intervals"][t][i + 1])>=
# self.cp_model.end(self.variables["intervals"][t][i])+1))
[docs]
def constraint_variable_to_duration(self):
for t in self.variables["presences"]:
nb_preemption = len(self.variables["presences"][t])
self.cp_model.enforce(
self.cp_model.sum(
[
self.cp_model.guard(
self.cp_model.length(self.variables["intervals"][t][i]), 0
)
for i in range(nb_preemption)
]
)
== self.variables["duration"][t]
)
[docs]
def constraint_precedence(self):
for t in self.problem.successors:
for succ in self.problem.successors[t]:
self.cp_model.end_before_start(
self.variables["main_interval"][t],
self.variables["main_interval"][succ],
)
[docs]
def constraint_resource(self):
fake_tasks = create_fake_tasks(self.problem)
for r in self.problem.resources:
if r not in self.problem.non_renewable_resources:
self.constraint_resource_cumulative(resource=r, fake_tasks=fake_tasks)
else:
self.constraint_resource_non_renewable(resource=r)
[docs]
def constraint_resource_cumulative(
self, resource: str, fake_tasks: list[dict[str, int]]
):
potential_tasks = [
t
for t in self.variables["resource_consumption"]
if resource in self.variables["resource_consumption"][t]
]
intervals = [
self.cp_model.pulse(
self.variables["intervals"][t][i],
self.variables["resource_consumption"][t][resource],
)
for t in potential_tasks
for i in range(len(self.variables["intervals"][t]))
]
fake_tasks_of_interest = [
self.cp_model.pulse(
interval=self.cp_model.interval_var(
start=f["start"],
end=f["start"] + f["duration"],
length=f["duration"],
optional=False,
),
height=f.get(resource, 0),
)
for f in fake_tasks
if f.get(resource, 0) > 0
]
capa = self.problem.get_max_resource_capacity(resource)
self.cp_model.enforce(
self.cp_model.sum(intervals + fake_tasks_of_interest) <= capa
)
[docs]
def constraint_resource_non_renewable(self, resource: str):
potential_tasks = [
t
for t in self.variables["resource_consumption"]
if resource in self.variables["resource_consumption"][t]
]
capa = self.problem.get_max_resource_capacity(resource)
self.cp_model.enforce(
self.cp_model.sum(
[
self.variables["resource_consumption"][t][resource]
for t in potential_tasks
]
)
<= capa
)
[docs]
def retrieve_solution(self, result: cp.SolveResult) -> PreemptiveRcpspSolution:
# self.warm_start_solution = result.solution
self.warm_start_solution = result.solution
modes_dict = {}
schedule = {}
for t in self.variables["intervals"]:
sched = []
for i in range(len(self.variables["intervals"][t])):
if result.solution.is_present(self.variables["intervals"][t][i]):
sched.append(
result.solution.get_value(self.variables["intervals"][t][i])
)
schedule[t] = {
"starts": [x[0] for x in sched],
"ends": [x[1] for x in sched],
}
for m in self.variables["modes"][t]:
if isinstance(self.variables["modes"][t][m], int):
modes_dict[t] = m
elif result.solution.get_value(self.variables["modes"][t][m]):
modes_dict[t] = m
modes = [modes_dict[t] for t in self.problem.tasks_list_non_dummy]
sol = PreemptiveRcpspSolution(
problem=self.problem, rcpsp_modes=modes, rcpsp_schedule=schedule
)
sol._intern_obj = {
obj: self.get_lexico_objective_value(
obj, self.create_result_storage([(sol, 0)])
)
for obj in self.get_lexico_objectives_available()
}
self.last_sol_do = sol
return sol
[docs]
def compute_binary_calendar_per_tasks(
problem: PreemptiveRcpspProblem,
) -> tuple[dict[tuple, np.ndarray], dict[tuple[int, int], tuple]]:
availability = {
res: np.array(problem.get_resource_availability_array(res))
for res in problem.resources_list
}
resource_calendar_dict = {
problem.resources_list[i]: availability[problem.resources_list[i]] > 0
for i in range(len(problem.resources_list))
}
cumulative_calendar_dict = {
r: np.cumsum(resource_calendar_dict[r]) for r in resource_calendar_dict
}
durations = {
(i, m): None for i in problem.tasks_list for m in problem.mode_details[i]
}
task_mode_to_calendar = {}
for i in problem.tasks_list:
for m in problem.mode_details[i]:
duration = problem.mode_details[i][m]["duration"]
resource_non_zeros = [
r
for r in problem.resources_list
if problem.mode_details[i][m].get(r, 0) > 0
]
if len(resource_non_zeros) == 0:
durations[i, m] = ([], {duration: [[0, problem.horizon]]})
elif len(resource_non_zeros) == 1:
# One resource pool is used.
res_consumption = problem.mode_details[i][m][resource_non_zeros[0]]
c = availability[resource_non_zeros[0]] >= res_consumption
resource_calendar_dict[(resource_non_zeros[0], res_consumption)] = c
task_mode_to_calendar[i, m] = (resource_non_zeros[0], res_consumption)
else:
tuple_res = tuple(
[(r, problem.mode_details[i][m][r]) for r in resource_non_zeros]
)
if tuple_res not in resource_calendar_dict:
# For the first resource in the tuple, b "availability >= consumption"
first_res_id, first_consumption = tuple_res[0]
b = availability[first_res_id] >= first_consumption
for res_id, cons in tuple_res[1:]:
b &= availability[res_id] >= cons
resource_calendar_dict[tuple_res] = b
cumulative_calendar_dict[tuple_res] = np.cumsum(
resource_calendar_dict[tuple_res]
)
task_mode_to_calendar[i, m] = tuple_res
return resource_calendar_dict, task_mode_to_calendar
[docs]
class OptalCalendarPreemptiveRcpspSolver(OptalCpSolver):
problem: PreemptiveRcpspProblem
def __init__(
self,
problem: PreemptiveRcpspProblem,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
**kwargs,
) -> None:
super().__init__(problem, params_objective_function, **kwargs)
self.variables = {}
self.last_sol_do = None
self.calendar_step_functions = {}
self.resource_calendar_dict, self.task_mode_to_calendar = (
compute_binary_calendar_per_tasks(self.problem)
)
[docs]
def create_calendar_step_function(self):
self.calendar_step_functions = {}
for i, m in self.task_mode_to_calendar:
key = self.task_mode_to_calendar[i, m]
if key in self.calendar_step_functions:
continue
array = self.resource_calendar_dict[key]
initial_value = array[0]
list_val = [(0, int(initial_value))]
for t in range(1, array.shape[0]):
if array[t] != array[t - 1]:
list_val.append((t, int(array[t])))
self.calendar_step_functions[key] = self.cp_model.step_function(list_val)
[docs]
def create_main_variables(self):
intervals = {}
opt_intervals = {}
for t in self.problem.tasks_list:
possible_durations = [
self.problem.mode_details[t][m]["duration"]
for m in self.problem.mode_details[t]
]
min_duration = min(possible_durations)
if min_duration == max(possible_durations) == 0:
intervals[t] = self.cp_model.interval_var(
start=(0, self.problem.horizon),
end=(min_duration, self.problem.horizon),
length=0,
optional=False,
name=f"intervals_{t}",
)
else:
intervals[t] = self.cp_model.interval_var(
start=(0, self.problem.horizon),
end=(min_duration, self.problem.horizon),
length=(min_duration, None),
optional=False,
name=f"intervals_{t}",
)
opt_intervals[t] = {}
modes = list(self.problem.mode_details[t].keys())
if len(modes) == 1:
opt_intervals[t][modes[0]] = intervals[t]
else:
for m in self.problem.mode_details[t]:
dur = self.problem.mode_details[t][m]["duration"]
opt_intervals[t][m] = self.cp_model.interval_var(
start=(0, self.problem.horizon),
end=(min_duration, self.problem.horizon),
length=(dur, None),
optional=True,
name=f"intervals_{t}_{m}",
)
self.cp_model.alternative(
intervals[t], [opt_intervals[t][m] for m in opt_intervals[t]]
)
self.variables["intervals"] = intervals
self.variables["opt_intervals"] = opt_intervals
[docs]
def create_precedence(self):
for t in self.problem.successors:
for succ in self.problem.successors[t]:
self.cp_model.end_before_start(
self.variables["intervals"][t], self.variables["intervals"][succ]
)
[docs]
def constraint_duration_integral(self):
for t in self.problem.tasks_list:
for m in self.problem.mode_details[t]:
dur = self.problem.mode_details[t][m]["duration"]
if dur > 0:
calendar_key = self.task_mode_to_calendar[(t, m)]
function = self.calendar_step_functions[calendar_key]
self.cp_model.enforce(
self.cp_model.guard(
self.cp_model.integral(
function, self.variables["opt_intervals"][t][m]
),
dur,
)
== dur
)
[docs]
def init_model(self, **kwargs: Any) -> None:
self.cp_model = cp.Model()
self.create_calendar_step_function()
self.create_main_variables()
self.create_precedence()
self.constraint_duration_integral()
self.constraint_resource()
self.variables["objectives"] = {
"makespan": self.cp_model.end(
self.variables["intervals"][self.problem.sink_task]
)
}
self.cp_model.minimize(self.variables["objectives"]["makespan"])
[docs]
def constraint_resource(self):
fake_tasks = create_fake_tasks(self.problem)
for r in self.problem.resources:
if r not in self.problem.non_renewable_resources:
self.constraint_resource_cumulative(resource=r, fake_tasks=fake_tasks)
else:
self.constraint_resource_non_renewable(resource=r)
[docs]
def constraint_resource_cumulative(
self, resource: str, fake_tasks: list[dict[str, int]]
):
max_capacity = self.problem.get_max_resource_capacity(resource)
potential_tasks = [
(t, i, self.problem.mode_details[t][i].get(resource, 0))
for t in self.variables["opt_intervals"]
for i in self.variables["opt_intervals"][t]
if self.problem.mode_details[t][i].get(resource, 0) > 0
]
different_calendar_values = set(
[f.get(resource, 0) for f in fake_tasks if f.get(resource, 0) > 0]
)
for diff_value in different_calendar_values:
calendar_pulse = [
self.cp_model.pulse(
interval=self.cp_model.interval_var(
start=f["start"],
end=f["start"] + f["duration"],
length=f["duration"],
optional=False,
),
height=f.get(resource, 0),
)
for f in fake_tasks
if 0 < f.get(resource, 0) <= diff_value
]
task_pulse = [
self.cp_model.pulse(
interval=self.variables["opt_intervals"][t][m], height=q
)
for t, m, q in potential_tasks
if q + diff_value <= max_capacity
]
if len(task_pulse) == 0:
continue
self.cp_model.enforce(
self.cp_model.sum(task_pulse + calendar_pulse) <= max_capacity
)
[docs]
def constraint_resource_non_renewable(self, resource: str):
potential_tasks = [
(t, m, self.problem.mode_details[t][m].get(resource, 0))
for t in self.variables["opt_intervals"]
for m in self.variables["opt_intervals"][t]
if self.problem.mode_details[t][m].get(resource, 0) > 0
]
capa = self.problem.get_max_resource_capacity(resource)
self.cp_model.enforce(
self.cp_model.sum(
[
self.cp_model.presence(self.variables["opt_intervals"][t][m]) * q
for t, m, q in potential_tasks
]
)
<= capa
)
# self.cp_model.enforce(self.cp_model.sum([self.cp_model.step_at_start(self.variables["opt_intervals"][t][m],
# q)
# for t,m,q in potential_tasks]) <= capa)
[docs]
def retrieve_solution(self, result: cp.SolveResult) -> PreemptiveRcpspSolution:
# self.warm_start_solution = result.solution
self.warm_start_solution = result.solution
modes_dict = {}
schedule = {}
for t in self.variables["intervals"]:
sched = []
st, end = result.solution.get_value(self.variables["intervals"][t])
schedule[t] = {"starts": [st], "ends": [end]}
modes = list(self.problem.mode_details[t].keys())
if len(modes) == 1:
modes_dict[t] = modes[0]
else:
for m in self.variables["opt_intervals"][t]:
if result.solution.is_present(
self.variables["opt_intervals"][t][m]
):
modes_dict[t] = m
modes = [modes_dict[t] for t in self.problem.tasks_list_non_dummy]
sol = PreemptiveRcpspSolution(
problem=self.problem, rcpsp_modes=modes, rcpsp_schedule=schedule
)
self.last_sol_do = sol
return sol