Source code for discrete_optimization.rcpsp_multiskill.solvers.cpsat_auto

#  Copyright (c) 2024 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import logging
from typing import Any

from ortools.sat.python.cp_model import (
    CpSolverSolutionCallback,
    Domain,
)

from discrete_optimization.generic_tasks_tools.solvers.cpsat.auto import (
    GenericSchedulingAutoCpSatSolver,
    TemporarySolution,
)
from discrete_optimization.generic_tools.do_problem import Solution
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    CategoricalHyperparameter,
)
from discrete_optimization.rcpsp_multiskill.problem import (
    NB_EMPLOYEES_LB,
    CumulativeResource,
    MultiskillRcpspProblem,
    MultiskillRcpspSolution,
    NonRenewableResource,
    Task,
    UnaryResource,
)

logger = logging.getLogger(__name__)


[docs] class CpSatAutoMultiskillRcpspSolver( GenericSchedulingAutoCpSatSolver[ Task, UnaryResource, CumulativeResource, NonRenewableResource ], ): hyperparameters = [ CategoricalHyperparameter( name="redundant_skill_cumulative", choices=[True, False], default=True ), CategoricalHyperparameter( name="redundant_worker_cumulative", choices=[True, False], default=True ), ] problem: MultiskillRcpspProblem
[docs] def convert_task_variables_to_solution( self, temp_sol: TemporarySolution[Task, UnaryResource] ) -> Solution: """Convert solution from autosolver format into do format. To be used in `self.retrieve_solution()`. Args: temp_sol: Returns: """ modes_dict = {} schedule = {} employee_usage = {} for task, task_variable in temp_sol.task_variables.items(): schedule[task] = { "start_time": task_variable.start, "end_time": task_variable.end, } modes_dict[task] = task_variable.mode employee_usage[task] = { worker: contrib for worker, contrib in task_variable.info["contrib"].items() if len(contrib) > 0 } sol = MultiskillRcpspSolution( problem=self.problem, schedule=schedule, modes=modes_dict, employee_usage=employee_usage, ) sol._internal_obj = temp_sol.metadata return sol
[docs] def retrieve_tasks_variables( self, cpsolvercb: CpSolverSolutionCallback ) -> TemporarySolution[Task, UnaryResource]: """Construct each task variable from the cpsat solver internal solution. It will be called each time the cpsat solver find a new solution. At that point, value of internal variables are accessible via `cpsolvercb.value(VARIABLE_NAME)`. We override the method from generic auto solver to add the worker skills contribution and internal objective value. Args: cpsolvercb: the ortools callback called when the cpsat solver finds a new solution. Returns: the task variables for the intermediate solution """ temp_sol = super().retrieve_tasks_variables(cpsolvercb) # worker skills contribution per task for task, task_variable in temp_sol.task_variables.items(): task_variable.info["contrib"] = {} for worker in task_variable.allocated: try: skills_used_vars = self.skills_used_per_worker[task][worker] except KeyError: contrib = set() else: contrib = { s for s, used_var in skills_used_vars.items() if cpsolvercb.value(used_var) } task_variable.info["contrib"][worker] = contrib # internal objective temp_sol.metadata["makespan"] = cpsolvercb.value( self.get_global_makespan_variable() ) return temp_sol
[docs] def include_constraint_on_cumulative_resource( self, resource: CumulativeResource ) -> bool: """Whether the cp model should take into account the constraint on the given cumulative resource. The constraints on skills as cumulative resources and on number of employees are to be added according to `redundant_skill_cumulative` and `redundant_worker_cumulative` hyperparameters. Args: resource: Returns: """ if resource in self.problem.skills_set: return self._redundant_skill_cumulative elif resource == NB_EMPLOYEES_LB: return self._redundant_worker_cumulative else: return super().include_constraint_on_cumulative_resource(resource)
[docs] def is_compatible_task_unary_resource( self, task: Task, unary_resource: UnaryResource ) -> bool: if len(self.problem.skills_of_task[task]) == 0: # never allocate an employee to a task not needing a skill return False elif self._one_worker_per_task: return any( all( self.problem.employees[unary_resource].get_skill_level(s) >= self.problem.mode_details[task][mode].get(s, 0) for s in self.problem.skills_of_task[task] ) for mode in self.problem.mode_details[task] ) else: return super().is_compatible_task_unary_resource(task, unary_resource)
[docs] def init_model(self, **kwargs: Any) -> None: if self.problem.is_preemptive(): raise NotImplementedError() kwargs = self.complete_with_default_hyperparameters(kwargs) # redundant cumulative resources to consider self._redundant_skill_cumulative = kwargs["redundant_skill_cumulative"] self._redundant_worker_cumulative = kwargs["redundant_worker_cumulative"] # additional constraints (subproblem) self._one_worker_per_task = kwargs.get("one_worker_per_task", False) self._one_skill_per_task = kwargs.get("one_skill_per_task", False) self.at_most_one_unary_resource_per_task = self._one_worker_per_task self._exact_skill = kwargs.get("exact_skill", False) self._slack_skill = kwargs.get("slack_skill", False) super().init_model(**kwargs) self.create_skills_used_per_worker() self.create_skills_per_task_variables() self.create_skills_constraint_worker() self.create_skills_constraints_v2()
# self.create_workload_variables() # not used for now
[docs] def create_skills_used_per_worker(self): one_skill_per_task = self._one_skill_per_task skills_used_var = {} for task in self.problem.tasks_list: skills_of_task = self.problem.skills_of_task[task] if len(skills_of_task) == 0: # no need of employees continue skills_used_var[task] = {} for worker in self.problem.employees: if self.is_compatible_task_unary_resource( task=task, unary_resource=worker ): skills_used_var[task][worker] = {} skills_of_worker = self.problem.employees[ worker ].get_non_zero_skills() for s in skills_of_task: if s in skills_of_worker: if not one_skill_per_task or len(skills_of_worker) == 1: skills_used_var[task][worker][s] = ( self.allocation_is_present[task][worker] ) else: skills_used_var[task][worker][s] = ( self.cp_model.new_bool_var( name=f"skill_{task}_{worker}_{s}" ) ) for s in skills_used_var[task][worker]: self.cp_model.add( skills_used_var[task][worker][s] <= self.allocation_is_present[task][worker] ) self.cp_model.add_bool_or( [ skills_used_var[task][worker][s] for s in skills_used_var[task][worker] ] ).only_enforce_if(self.allocation_is_present[task][worker]) if one_skill_per_task: if len(skills_used_var[task][worker]) >= 1: self.cp_model.add_at_most_one( [ skills_used_var[task][worker][s] for s in skills_used_var[task][worker] ] ) self.skills_used_per_worker = skills_used_var
[docs] def create_skills_per_task_variables(self): skills_var = {} for task in self.problem.tasks_list: skills_var[task] = {} for s in self.problem.skills_of_task[task]: possible_values = [ self.problem.mode_details[task][mode].get(s, 0) for mode in self.problem.mode_details[task] ] skills_var[task][s] = self.cp_model.new_int_var_from_domain( domain=Domain.from_values(possible_values), name=f"skills_{task}_{s}", ) for mode, is_present_mode in self.modes_is_present[task].items(): val = self.problem.mode_details[task][mode].get(s, 0) self.cp_model.add(skills_var[task][s] == val).OnlyEnforceIf( is_present_mode ) self.skills_per_task = skills_var
[docs] def create_workload_variables(self): workload = {} for emp in self.problem.employees: workload[emp] = sum( [ # NB: we use duration of mode 1 instead of actual duration variable to avoid quadratic constraint # (impossible in cpsat) # the constraint is correct in single mode self.problem.get_task_mode_duration(task=task, mode=1) * is_present_task_worker[emp] for task, is_present_task_worker in self.allocation_is_present.items() if emp in is_present_task_worker ] ) max_workload = self.cp_model.new_int_var( lb=0, ub=self.problem.horizon, name=f"max_workload" ) min_workload = self.cp_model.new_int_var( lb=0, ub=self.problem.horizon, name=f"min_workload" ) self.cp_model.add_max_equality( max_workload, [workload[emp] for emp in workload] ) self.cp_model.add_min_equality( min_workload, [workload[emp] for emp in workload] ) self.max_workload_var = max_workload self.min_workload_var = min_workload
[docs] def create_skills_constraint_worker(self): exact_skill = self._exact_skill slack_skill = self._slack_skill if slack_skill: slack_skill_dict = {} for task in self.skills_per_task: if slack_skill: slack_skill_dict[task] = {} for s in self.skills_per_task[task]: if slack_skill: slack_skill_dict[task][s] = self.cp_model.new_int_var( lb=0, ub=5, name=f"slack_{task}_{s}" ) skill_value_put_on_task = sum( skill_value * is_present_worker for worker, is_present_worker in self.allocation_is_present[ task ].items() if ( skill_value := self.problem.employees[worker].get_skill_level(s) ) > 0 ) if exact_skill: if not slack_skill: self.cp_model.add( skill_value_put_on_task == self.skills_per_task[task][s] ) else: self.cp_model.add( skill_value_put_on_task == self.skills_per_task[task][s] + slack_skill_dict[task][s] ) else: if not slack_skill: self.cp_model.add( skill_value_put_on_task >= self.skills_per_task[task][s] ) else: self.cp_model.add( skill_value_put_on_task >= self.skills_per_task[task][s] + slack_skill_dict[task][s] ) if slack_skill: self.slack_skill_v1_variables = slack_skill_dict
[docs] def create_skills_constraints_v2(self): """ using skills_used variable """ exact_skill = self._exact_skill slack_skill = self._slack_skill if slack_skill: slack_skill_dict = {} for task in self.skills_per_task: if slack_skill: slack_skill_dict[task] = {} for s in self.skills_per_task[task]: if slack_skill: slack_skill_dict[task][s] = self.cp_model.new_int_var( lb=0, ub=5, name=f"slack_{task}_{s}" ) skill_value_put_on_task = sum( self.problem.employees[worker].get_skill_level(s) * skills_used_var[s] for worker, skills_used_var in self.skills_used_per_worker[ task ].items() if s in skills_used_var ) if exact_skill: if not slack_skill: self.cp_model.add( skill_value_put_on_task == self.skills_per_task[task][s] ) else: self.cp_model.add( skill_value_put_on_task == self.skills_per_task[task][s] + slack_skill_dict[task][s] ) else: if not slack_skill: self.cp_model.add( skill_value_put_on_task >= self.skills_per_task[task][s] ) else: self.cp_model.add( skill_value_put_on_task >= self.skills_per_task[task][s] + slack_skill_dict[task][s] ) if slack_skill: self.slack_skill_v2_variables = slack_skill_dict
[docs] def create_total_cost_variable(self): self.total_cost_var = sum( int(10 * self.problem.employees[worker].salary) * is_allocated * self.duration_variables[task] for task, is_allocated_vars in self.allocation_is_present.items() for worker, is_allocated in is_allocated_vars.items() )