# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
from abc import abstractmethod
from typing import Any, Iterable, Optional
from discrete_optimization.generic_tasks_tools.allocation import (
AllocationCpSolver,
AllocationSolution,
UnaryResource,
)
from discrete_optimization.generic_tasks_tools.enums import StartOrEnd
from discrete_optimization.generic_tasks_tools.multimode import MultimodeCpSolver
from discrete_optimization.generic_tasks_tools.scheduling import (
SchedulingCpSolver,
Task,
)
from discrete_optimization.generic_tasks_tools.solvers.cpsat import is_a_trivial_zero
from discrete_optimization.generic_tools.cp_tools import SignEnum
from discrete_optimization.generic_tools.hub_solver.optal.optalcp_tools import (
OptalCpSolver,
)
try:
import optalcp as cp
except ImportError:
cp = None
optalcp_available = False
else:
optalcp_available = True
[docs]
class SchedulingOptalSolver(OptalCpSolver, SchedulingCpSolver[Task]):
[docs]
@abstractmethod
def get_task_interval_variable(self, task: Task) -> "cp.IntervalVar":
"""Retrieve the interval variable of given task."""
...
[docs]
def get_task_start_or_end_variable(
self, task: Task, start_or_end: StartOrEnd
) -> "cp.IntExpr":
"""Retrieve the variable storing the start or end time of given task.
Args:
task:
start_or_end:
Returns:
"""
itv = self.get_task_interval_variable(task)
if start_or_end == StartOrEnd.START:
return self.cp_model.start(itv)
if start_or_end == StartOrEnd.END:
return self.cp_model.end(itv)
return None
[docs]
def add_constraint_on_task(
self, task: Task, start_or_end: StartOrEnd, sign: SignEnum, time: int
) -> list["cp.BoolExpr"]:
var = self.get_task_start_or_end_variable(task, start_or_end)
return self.add_bound_constraint(var, sign, time)
[docs]
def add_constraint_chaining_tasks(self, task1: Task, task2: Task) -> list[Any]:
itv1 = self.get_task_interval_variable(task1)
itv2 = self.get_task_interval_variable(task2)
return [self.cp_model.start_at_end(itv2, itv1)]
[docs]
def get_subtasks_makespan_variable(self, subtasks: Iterable[Task]) -> Any:
return self.cp_model.max(
[
self.get_task_start_or_end_variable(
task=task, start_or_end=StartOrEnd.END
)
for task in subtasks
]
)
[docs]
def get_subtasks_sum_end_time_variable(self, subtasks: Iterable[Task]) -> Any:
return self.cp_model.sum(
[
self.get_task_start_or_end_variable(
task=task, start_or_end=StartOrEnd.END
)
for task in subtasks
]
)
[docs]
def get_subtasks_sum_start_time_variable(self, subtasks: Iterable[Task]) -> Any:
return self.cp_model.max(
[
self.get_task_start_or_end_variable(
task=task, start_or_end=StartOrEnd.START
)
for task in subtasks
]
)
[docs]
class MultimodeOptalSolver(OptalCpSolver, MultimodeCpSolver[Task]):
[docs]
@abstractmethod
def get_task_mode_is_present_variable(self, task: Task, mode: int) -> "cp.BoolExpr":
"""Retrieve the 0-1 variable/expression telling if the mode is used for the task.
Args:
task:
mode:
Returns:
"""
...
[docs]
def add_constraint_on_task_mode(self, task: Task, mode: int) -> list[Any]:
possible_modes = self.problem.get_task_modes(task)
if mode not in possible_modes:
raise ValueError(f"Task {task} cannot be done with mode {mode}.")
if len(possible_modes) == 1:
return []
constraints = []
for other_mode in possible_modes:
var = self.get_task_mode_is_present_variable(task=task, mode=other_mode)
if other_mode == mode:
constraints.append(self.cp_model.enforce(var == True))
else:
constraints.append(self.cp_model.enforce(var == False))
return constraints
[docs]
class AllocationOptalSolver(
OptalCpSolver,
AllocationCpSolver[Task, UnaryResource],
):
"""Base class for allocation cp-sat solvers using a binary modelling.
I.e. using 0-1 variables to model allocation status of each couple (task, unary_resource)
This is a more general modelisation thant the integer one as it allows allocation of multiple resources.
"""
allocation_changes_variables_created = False
"""Flag telling whether 'allocation changes variables' have been created"""
allocation_changes_variables: dict[tuple[Task, UnaryResource], "cp.IntExpr"]
"""Variables tracking allocation changes from a given reference."""
used_variables_created = False
"""Flag telling whether 'used variables' have been created"""
used_variables: dict[UnaryResource, cp.BoolVar]
"""Variables tracking whether a unary resource has been used at least once."""
done_variables_created = False
"""Flag telling whether 'done variables' have been created"""
done_variables: dict[Task, cp.BoolExpr]
"""Variables tracking whether a task has at least one unary resource allocated."""
at_most_one_unary_resource_per_task = False
"""Flag telling if the problem accept at most one unary_resource per task.
Default to False, ie several resources allowed per task.
"""
@property
def subset_tasks_of_interest(self) -> Iterable[Task]:
"""Subset of tasks of interest used for the objective.
By default, all tasks.
"""
return self.problem.tasks_list
@property
def subset_unaryresources_allowed(self) -> Iterable[UnaryResource]:
"""Unary resources allowed to solve the problem.
By default, all unary resources.
"""
return self.problem.unary_resources_list
[docs]
def get_default_tasks_n_unary_resources(
self,
tasks: Optional[Iterable[Task]] = None,
unary_resources: Optional[Iterable[UnaryResource]] = None,
) -> tuple[Iterable[Task], Iterable[UnaryResource]]:
if tasks is None:
tasks = self.subset_tasks_of_interest
if unary_resources is None:
unary_resources = self.subset_unaryresources_allowed
return tasks, unary_resources
[docs]
def init_model(self, **kwargs: Any) -> None:
"""Init cp model and reset stored variables if any."""
super().init_model(**kwargs)
self.used_variables_created = False
self.used_variables = {}
self.done_variables_created = False
self.done_variables_created = {}
[docs]
@abstractmethod
def get_task_unary_resource_is_present_variable(
self, task: Task, unary_resource: UnaryResource
) -> "cp.BoolExpr":
"""Return a 0-1 variable/expression telling if the unary_resource is used for the task.
NB: sometimes the given resource is never to be used by a task and the variable has not been created.
The convention is to return 0 in that case.
"""
...
[docs]
def add_constraint_on_task_unary_resource_allocation(
self, task: Task, unary_resource: UnaryResource, used: bool
) -> list[Any]:
var = self.get_task_unary_resource_is_present_variable(
task=task, unary_resource=unary_resource
)
expr = var == used
self.cp_model.enforce(expr)
return [expr]
[docs]
def add_constraint_on_nb_allocation_changes(
self,
ref: AllocationSolution[Task, UnaryResource],
nb_changes: int,
sign: SignEnum = SignEnum.LEQ,
) -> list[Any]:
self.create_allocation_changes_variables()
# constraints so that change variables reflect diff to ref
constraints = self.add_allocation_changes_constraints(ref=ref)
# nb of changes variable
var = sum(self.allocation_changes_variables.values())
constraints += self.add_bound_constraint(var=var, sign=sign, value=nb_changes)
return constraints
[docs]
def add_allocation_changes_constraints(
self, ref: AllocationSolution[Task, UnaryResource]
) -> list[Any]:
"""Add and return constraints so that change variables reflect diff to ref."""
tasks, unary_resources = self.get_default_tasks_n_unary_resources()
constraints = []
for task in tasks:
for unary_resource in unary_resources:
is_present = self.get_task_unary_resource_is_present_variable(
task=task, unary_resource=unary_resource
)
allocation_change = self.allocation_changes_variables[
(task, unary_resource)
]
is_allocated_ref = ref.is_allocated(
task=task, unary_resource=unary_resource
)
if is_a_trivial_zero(is_present):
# can never be allocated: change <=> ref has allocated
self.cp_model.enforce(allocation_change == is_allocated_ref)
constraints.append(allocation_change == is_allocated_ref)
else:
self.cp_model.enforce(
allocation_change == (is_present != is_allocated_ref)
)
constraints.append(
allocation_change == (is_present != is_allocated_ref)
)
return constraints
[docs]
def create_allocation_changes_variables(self):
"""Create variables necessary for constraint on nb of changes."""
if not self.allocation_changes_variables_created:
tasks, unary_resources = self.get_default_tasks_n_unary_resources()
self.allocation_changes_variables = {
(task, unary_resource): self.cp_model.bool_var(
f"change_{task}_{unary_resource}"
)
for task in tasks
for unary_resource in unary_resources
}
self.allocation_changes_variables_created = True
[docs]
def add_constraint_nb_unary_resource_usages(
self,
sign: SignEnum,
target: int,
tasks: Optional[Iterable[Task]] = None,
unary_resources: Optional[Iterable[UnaryResource]] = None,
) -> list[Any]:
tasks, unary_resources = self.get_default_tasks_n_unary_resources(
tasks=tasks, unary_resources=unary_resources
)
var = sum(
is_present
for task in tasks
for unary_resource in unary_resources
# filter out trivial 0's corresponding to incompatible (task, resource)
if not (
is_a_trivial_zero(
is_present := self.get_task_unary_resource_is_present_variable(
task, unary_resource
)
)
)
)
return self.add_bound_constraint(var=var, sign=sign, value=target)
[docs]
def add_constraint_on_total_nb_usages(
self, sign: SignEnum, target: int
) -> list[Any]:
return self.add_constraint_nb_unary_resource_usages(sign=sign, target=target)
[docs]
def add_constraint_on_unary_resource_nb_usages(
self, unary_resource: UnaryResource, sign: SignEnum, target: int
) -> list[Any]:
return self.add_constraint_nb_unary_resource_usages(
sign=sign, target=target, unary_resources=(unary_resource,)
)
[docs]
def create_used_variables(self):
if not self.used_variables_created:
self.used_variables = {}
for unary_resource in self.subset_unaryresources_allowed:
used = self.cp_model.bool_var(f"used_{unary_resource}")
self.used_variables[unary_resource] = used
list_is_present_variables = [
is_present
for task in self.subset_tasks_of_interest
# filter out trivial 0's corresponding to incompatible (task, resource)
if not (
is_a_trivial_zero(
is_present
:= self.get_task_unary_resource_is_present_variable(
task, unary_resource
)
)
)
]
if len(list_is_present_variables) > 0:
self.cp_model.enforce(
used == self.cp_model.max(list_is_present_variables)
)
else:
self.cp_model.enforce(used == 0)
self.used_variables_created = True
[docs]
def create_done_variables(self):
if not self.done_variables_created:
self.done_variables = {}
for task in self.subset_tasks_of_interest:
done = self.cp_model.bool_var(f"{task}_done")
self.done_variables[task] = done
list_is_present_variables = [
is_present
for unary_resource in self.subset_unaryresources_allowed
# filter out trivial 0's corresponding to incompatible (task, resource)
if not (
is_a_trivial_zero(
is_present
:= self.get_task_unary_resource_is_present_variable(
task, unary_resource
)
)
)
]
if len(list_is_present_variables) > 0:
self.cp_model.enforce(
done == self.cp_model.max(list_is_present_variables)
)
else:
self.cp_model.enforce(done == 0)
self.done_variables_created = True
[docs]
def get_nb_tasks_done_variable(self) -> Any:
self.create_done_variables()
return sum(self.done_variables.values())
[docs]
def get_nb_unary_resources_used_variable(self) -> Any:
self.create_used_variables()
return sum(self.used_variables.values())