Source code for discrete_optimization.generic_tasks_tools.solvers.cpsat

from abc import abstractmethod
from enum import Enum
from typing import Any, Iterable, Optional

from ortools.sat.python.cp_model import IntVar, LinearExprT

from discrete_optimization.generic_tasks_tools.allocation import (
    AllocationCpSolver,
    AllocationSolution,
    UnaryResource,
)
from discrete_optimization.generic_tasks_tools.enums import StartOrEnd
from discrete_optimization.generic_tasks_tools.multimode import MultimodeCpSolver
from discrete_optimization.generic_tasks_tools.scheduling import (
    SchedulingCpSolver,
    Task,
)
from discrete_optimization.generic_tools.cp_tools import SignEnum
from discrete_optimization.generic_tools.ortools_cpsat_tools import OrtoolsCpSatSolver


[docs] class SchedulingCpSatSolver(OrtoolsCpSatSolver, SchedulingCpSolver[Task]): """Base class for most ortools/cpsat solvers handling scheduling problems. Allows to have common code. """ _makespan: Optional[IntVar] = None """Internal variable use to define the global makespan.""" _subtasks_makespan: Optional[IntVar] = None """Internal variable use to define the partial makespan.""" constraints_on_makespan: Optional[list[Any]] = None """Constraints on partial makespan so that it can be considered as the objective."""
[docs] def init_model(self, **kwargs: Any) -> None: """Init cp model and reset stored variables if any.""" super().init_model(**kwargs) self._makespan = None self._subtasks_makespan = None self.constraints_on_makespan = None
[docs] @abstractmethod def get_task_start_or_end_variable( self, task: Task, start_or_end: StartOrEnd ) -> LinearExprT: """Retrieve the variable storing the start or end time of given task. Args: task: start_or_end: Returns: """ ...
[docs] def add_constraint_on_task( self, task: Task, start_or_end: StartOrEnd, sign: SignEnum, time: int ) -> list[Any]: var = self.get_task_start_or_end_variable(task=task, start_or_end=start_or_end) return self.add_bound_constraint(var=var, sign=sign, value=time)
[docs] def add_constraint_chaining_tasks(self, task1: Task, task2: Task) -> list[Any]: var1 = self.get_task_start_or_end_variable( task=task1, start_or_end=StartOrEnd.END ) var2 = self.get_task_start_or_end_variable( task=task2, start_or_end=StartOrEnd.START ) return [self.cp_model.add(var1 == var2)]
[docs] def get_makespan_var(self) -> IntVar: """Get the makespan variable used to track global makespan.""" if self._makespan is None: self._makespan = self.cp_model.NewIntVar( lb=self.get_makespan_lower_bound(), ub=self.get_makespan_upper_bound(), name="makespan", ) return self._makespan
[docs] def get_subtasks_makespan_var(self) -> IntVar: """Get the makespan variable used to track subtasks makespan.""" if self._subtasks_makespan is None: self._subtasks_makespan = self.cp_model.NewIntVar( lb=0, # lower bound for any tasks subset ub=self.get_makespan_upper_bound(), name="subtasks_makespan", ) return self._subtasks_makespan
[docs] def remove_constraints_on_objective(self) -> None: if self.constraints_on_makespan is not None: self.remove_constraints(self.constraints_on_makespan)
[docs] def get_global_makespan_variable(self) -> Any: # remove previous constraints on makespan variable from cp model self.remove_constraints_on_objective() # get makespan variable makespan = self.get_makespan_var() # update those constraints self.constraints_on_makespan = [ self.cp_model.AddMaxEquality( makespan, [ self.get_task_start_or_end_variable(task, StartOrEnd.END) for task in self.problem.get_last_tasks() ], ) ] return makespan
[docs] def get_subtasks_makespan_variable(self, subtasks: Iterable[Task]) -> Any: # remove previous constraints on makespan variable from cp model self.remove_constraints_on_objective() # get makespan variable makespan = self.get_subtasks_makespan_var() # update those constraints self.constraints_on_makespan = [ self.cp_model.AddMaxEquality( makespan, [ self.get_task_start_or_end_variable(task, StartOrEnd.END) for task in subtasks ], ) ] return makespan
[docs] def get_subtasks_sum_end_time_variable(self, subtasks: Iterable[Task]) -> Any: self.remove_constraints_on_objective() return sum( self.get_task_start_or_end_variable(task, StartOrEnd.END) for task in subtasks )
[docs] def get_subtasks_sum_start_time_variable(self, subtasks: Iterable[Task]) -> Any: self.remove_constraints_on_objective() return sum( self.get_task_start_or_end_variable(task, StartOrEnd.START) for task in subtasks )
[docs] class MultimodeCpSatSolver(OrtoolsCpSatSolver, MultimodeCpSolver[Task]):
[docs] @abstractmethod def get_task_mode_is_present_variable(self, task: Task, mode: int) -> LinearExprT: """Retrieve the 0-1 variable/expression telling if the mode is used for the task. Args: task: mode: Returns: """ ...
[docs] def add_constraint_on_task_mode(self, task: Task, mode: int) -> list[Any]: possible_modes = self.problem.get_task_modes(task) if mode not in possible_modes: raise ValueError(f"Task {task} cannot be done with mode {mode}.") if len(possible_modes) == 1: return [] constraints = [] for other_mode in possible_modes: var = self.get_task_mode_is_present_variable(task=task, mode=other_mode) if other_mode == mode: constraints.append(self.cp_model.add(var == True)) else: constraints.append(self.cp_model.add(var == False)) return constraints
[docs] class AllocationCpSatSolver( OrtoolsCpSatSolver, AllocationCpSolver[Task, UnaryResource], ): """Base class for allocation cp-sat solvers using a binary modelling. I.e. using 0-1 variables to model allocation status of each couple (task, unary_resource) This is a more general modelisation thant the integer one as it allows allocation of multiple resources. """ allocation_changes_variables_created = False """Flag telling whether 'allocation changes variables' have been created""" allocation_changes_variables: dict[tuple[Task, UnaryResource], IntVar] """Variables tracking allocation changes from a given reference.""" used_variables_created = False """Flag telling whether 'used variables' have been created""" used_variables: dict[UnaryResource, IntVar] """Variables tracking whether a unary resource has been used at least once.""" done_variables_created = False """Flag telling whether 'done variables' have been created""" done_variables: dict[Task, IntVar] """Variables tracking whether a task has at least one unary resource allocated.""" at_most_one_unary_resource_per_task = False """Flag telling if the problem accept at most one unary_resource per task. Default to False, ie several resources allowed per task. """
[docs] def init_model(self, **kwargs: Any) -> None: """Init cp model and reset stored variables if any.""" super().init_model(**kwargs) self.allocation_changes_variables_created = False self.allocation_changes_variables = {} self.used_variables_created = False self.used_variables = {} self.done_variables_created = False self.done_variables_created = {}
[docs] @abstractmethod def get_task_unary_resource_is_present_variable( self, task: Task, unary_resource: UnaryResource ) -> LinearExprT: """Return a 0-1 variable/expression telling if the unary_resource is used for the task. NB: sometimes the given resource is never to be used by a task and the variable has not been created. The convention is to return 0 in that case. """ ...
[docs] def add_constraint_on_task_unary_resource_allocation( self, task: Task, unary_resource: UnaryResource, used: bool ) -> list[Any]: var = self.get_task_unary_resource_is_present_variable( task=task, unary_resource=unary_resource ) return [self.cp_model.add(var == used)]
[docs] def add_constraint_on_nb_allocation_changes( self, ref: AllocationSolution[Task, UnaryResource], nb_changes: int ) -> list[Any]: tasks, unary_resources = self.get_default_tasks_n_unary_resources() self.create_allocation_changes_variables() # constraints so that change variables reflect diff to ref constraints = [] for task in tasks: for unary_resource in unary_resources: is_present = self.get_task_unary_resource_is_present_variable( task=task, unary_resource=unary_resource ) allocation_change = self.allocation_changes_variables[ (task, unary_resource) ] is_allocated_ref = ref.is_allocated( task=task, unary_resource=unary_resource ) if is_a_trivial_zero(is_present): # can never be allocated: change <=> ref has allocated constraints.append( self.cp_model.add(allocation_change == is_allocated_ref) ) else: constraints += [ self.cp_model.add( is_present != is_allocated_ref ).only_enforce_if(allocation_change), self.cp_model.add( is_present == is_allocated_ref ).only_enforce_if(~allocation_change), ] # nb of changes variable var = sum( self.allocation_changes_variables[(task, unary_resource)] for task in tasks for unary_resource in unary_resources ) constraints += [self.cp_model.add(var <= nb_changes)] return constraints
[docs] def create_allocation_changes_variables(self): """Create variables necessary for constraint on nb of changes.""" if not self.allocation_changes_variables_created: tasks, unary_resources = self.get_default_tasks_n_unary_resources() self.allocation_changes_variables = { (task, unary_resource): self.cp_model.new_bool_var( f"change_{task}_{unary_resource}" ) for task in tasks for unary_resource in unary_resources } self.allocation_changes_variables_created = True
[docs] def add_constraint_nb_unary_resource_usages( self, sign: SignEnum, target: int, tasks: Optional[Iterable[Task]] = None, unary_resources: Optional[Iterable[UnaryResource]] = None, ) -> list[Any]: tasks, unary_resources = self.get_default_tasks_n_unary_resources( tasks=tasks, unary_resources=unary_resources ) var = sum( is_present for task in tasks for unary_resource in unary_resources # filter out trivial 0's corresponding to incompatible (task, resource) if not ( is_a_trivial_zero( is_present := self.get_task_unary_resource_is_present_variable( task, unary_resource ) ) ) ) return self.add_bound_constraint(var=var, sign=sign, value=target)
[docs] def add_constraint_on_total_nb_usages( self, sign: SignEnum, target: int ) -> list[Any]: return self.add_constraint_nb_unary_resource_usages(sign=sign, target=target)
[docs] def add_constraint_on_unary_resource_nb_usages( self, unary_resource: UnaryResource, sign: SignEnum, target: int ) -> list[Any]: return self.add_constraint_nb_unary_resource_usages( sign=sign, target=target, unary_resources=(unary_resource,) )
[docs] def create_used_variables(self): if not self.used_variables_created: self.used_variables = {} for unary_resource in self.problem.unary_resources_list: used = self.cp_model.new_bool_var(f"used_{unary_resource}") self.used_variables[unary_resource] = used list_is_present_variables = [ is_present for task in self.problem.tasks_list # filter out trivial 0's corresponding to incompatible (task, resource) if not ( is_a_trivial_zero( is_present := self.get_task_unary_resource_is_present_variable( task, unary_resource ) ) ) ] if len(list_is_present_variables) > 0: self.cp_model.add_max_equality(used, list_is_present_variables) else: self.cp_model.add(used == 0) self.used_variables_created = True
[docs] def create_done_variables(self): if not self.done_variables_created: self.done_variables = {} for task in self.problem.tasks_list: done = self.cp_model.new_bool_var(f"{task}_done") self.done_variables[task] = done list_is_present_variables = [ is_present for unary_resource in self.problem.unary_resources_list # filter out trivial 0's corresponding to incompatible (task, resource) if not ( is_a_trivial_zero( is_present := self.get_task_unary_resource_is_present_variable( task, unary_resource ) ) ) ] if len(list_is_present_variables) > 0: if self.at_most_one_unary_resource_per_task: self.cp_model.add_at_most_one(list_is_present_variables) nb_teams_allocated_to_task = sum(list_is_present_variables) self.cp_model.add( nb_teams_allocated_to_task == 1 ).only_enforce_if(done) self.cp_model.add( nb_teams_allocated_to_task == 0 ).only_enforce_if(~done) else: self.cp_model.add_max_equality(done, list_is_present_variables) else: self.cp_model.add(done == 0) self.done_variables_created = True
[docs] def get_nb_tasks_done_variable(self) -> Any: self.create_done_variables() return sum(self.done_variables.values())
[docs] def get_nb_unary_resources_used_variable(self) -> Any: self.create_used_variables() return sum(self.used_variables.values())
[docs] class AllocationIntegerModellingCpSatSolver( AllocationCpSatSolver[Task, UnaryResource], ): """Base class for allocation cp-sat solvers using an integer modelling. I.e. using integer variables to model allocation of a task. This assumes that at most one unary_resource can be allocated to a task. """ is_present_variables_created = False is_present_variables: dict[tuple[Task, UnaryResource], IntVar]
[docs] @abstractmethod def get_task_allocation_variable( self, task: Task, ) -> LinearExprT: """Return an integer variable/expression storing the index of the allocated unary_resource. Assumes that exactly one unary resource is allocated to a task. """ ...
[docs] def create_is_present_variables(self) -> None: if not self.is_present_variables_created: tasks, unary_resources = self.get_default_tasks_n_unary_resources() self.is_present_variables = {} for task in tasks: for unary_resource in unary_resources: if self.problem.is_compatible_task_unary_resource( task=task, unary_resource=unary_resource ): boolvar = self.cp_model.new_bool_var( f"is_present_{task}_{unary_resource}" ) self.is_present_variables[(task, unary_resource)] = boolvar var = self.get_task_allocation_variable(task=task) value = self.problem.get_index_from_unary_resource( unary_resource ) self.cp_model.add(var == value).only_enforce_if(boolvar) self.cp_model.add(var != value).only_enforce_if(~boolvar) self.is_present_variables_created = True
[docs] def get_task_unary_resource_is_present_variable( self, task: Task, unary_resource: UnaryResource ) -> LinearExprT: """Return a 0-1 variable/expression telling if the unary_resource is used for the task. NB: sometimes the given resource is never to be used by a task and the variable has not been created. The convention is to return 0 in that case. """ self.create_is_present_variables() try: return self.is_present_variables[(task, unary_resource)] except KeyError: return 0
[docs] def add_constraint_on_task_unary_resource_allocation( self, task: Task, unary_resource: UnaryResource, used: bool ) -> list[Any]: var = self.get_task_allocation_variable(task=task) if used: return [self.cp_model.add(var == unary_resource)] else: return [self.cp_model.add(var != unary_resource)]
[docs] def add_constraint_on_nb_allocation_changes( self, ref: AllocationSolution[Task, UnaryResource], nb_changes: int ) -> list[Any]: tasks, unary_resources = self.get_default_tasks_n_unary_resources() self.create_allocation_changes_variables() # constraints so that change variables reflect diff to ref constraints = [] for task in tasks: for unary_resource in unary_resources: task_allocation = self.get_task_allocation_variable(task=task) i_unary_resource = self.problem.get_index_from_unary_resource( unary_resource=unary_resource ) allocation_change = self.allocation_changes_variables[ (task, unary_resource) ] if ref.is_allocated(task=task, unary_resource=unary_resource): subconstraints = [ self.cp_model.add( task_allocation != i_unary_resource ).only_enforce_if(allocation_change), self.cp_model.add( task_allocation == i_unary_resource ).only_enforce_if(~allocation_change), ] else: subconstraints = [ self.cp_model.add( task_allocation == i_unary_resource ).only_enforce_if(allocation_change), self.cp_model.add( task_allocation != i_unary_resource ).only_enforce_if(~allocation_change), ] constraints += subconstraints # nb of changes variable var = sum( self.allocation_changes_variables[(task, unary_resource)] for task in tasks for unary_resource in unary_resources ) constraints += [self.cp_model.add(var <= nb_changes)] return constraints
[docs] class AllocationModelling(Enum): BINARY = "binary" INTEGER = "integer"
[docs] class AllocationBinaryOrIntegerModellingCpSatSolver( AllocationIntegerModellingCpSatSolver[Task, UnaryResource], ): """Base class for allocation cp-sat solvers using a binary or integer modelling.""" allocation_modelling: AllocationModelling
[docs] @abstractmethod def get_binary_allocation_variable( self, task: Task, unary_resource: UnaryResource ) -> LinearExprT: """ "Return a 0-1 variable/expression telling if the unary_resource is used for the task. Only to be called when allocation_modelling == AllocationModelling.BINARY. NB: sometimes the given resource is never to be used by a task and the variable has not been created. The convention is to return 0 in that case. """ ...
[docs] @abstractmethod def get_integer_allocation_variable(self, task: Task) -> LinearExprT: """Return an integer variable/expression storing the index of the allocated unary_resource. Assumes that exactly one unary resource is allocated to a task. Only to be called when allocation_modelling == AllocationModelling.INTEGER. Args: task: Returns: """ ...
[docs] def get_task_allocation_variable(self, task: Task) -> LinearExprT: if self.allocation_modelling == AllocationModelling.INTEGER: return self.get_integer_allocation_variable(task=task) else: raise RuntimeError( "get_task_allocation_variable() cannot be called with binary allocation modelling." )
[docs] def get_task_unary_resource_is_present_variable( self, task: Task, unary_resource: UnaryResource ) -> LinearExprT: if self.allocation_modelling == AllocationModelling.INTEGER: return super().get_task_unary_resource_is_present_variable( task=task, unary_resource=unary_resource ) else: return self.get_binary_allocation_variable( task=task, unary_resource=unary_resource )
[docs] def add_constraint_on_task_unary_resource_allocation( self, task: Task, unary_resource: UnaryResource, used: bool ) -> list[Any]: if self.allocation_modelling == AllocationModelling.INTEGER: return super().add_constraint_on_task_unary_resource_allocation( task=task, unary_resource=unary_resource, used=used ) else: return ( AllocationCpSatSolver.add_constraint_on_task_unary_resource_allocation( self, task=task, unary_resource=unary_resource, used=used ) )
[docs] def add_constraint_on_nb_allocation_changes( self, ref: AllocationSolution[Task, UnaryResource], nb_changes: int ) -> list[Any]: if self.allocation_modelling == AllocationModelling.INTEGER: return super().add_constraint_on_nb_allocation_changes( ref=ref, nb_changes=nb_changes ) else: return AllocationCpSatSolver.add_constraint_on_nb_allocation_changes( self, ref=ref, nb_changes=nb_changes )
[docs] def is_a_trivial_zero(var: LinearExprT) -> bool: """Return whether the variable is actually a plain 0 integer. For instance, tells if is_present variables are real variables or not to avoid including them in sum, max, ... """ return isinstance(var, int) and var == 0