Source code for discrete_optimization.rcpsp_multiskill.solvers.cpsat

#  Copyright (c) 2024 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import logging
import math
from typing import Any, Iterable, List

import numpy as np
from ortools.sat.python.cp_model import (
    Constraint,
    CpModel,
    CpSolverSolutionCallback,
    Domain,
    LinearExpr,
)

from discrete_optimization.generic_tools.do_problem import Problem, Solution
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    CategoricalHyperparameter,
)
from discrete_optimization.generic_tools.ortools_cpsat_tools import OrtoolsCpSatSolver
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.rcpsp_multiskill.problem import (
    MultiskillRcpspProblem,
    MultiskillRcpspSolution,
    compute_discretize_calendar_skills,
    create_fake_tasks_multiskills,
    discretize_calendar_,
)

logger = logging.getLogger(__name__)


[docs] class CpSatMultiskillRcpspSolver(OrtoolsCpSatSolver): hyperparameters = [ CategoricalHyperparameter( name="redundant_skill_cumulative", choices=[True, False], default=True ), CategoricalHyperparameter( name="redundant_worker_cumulative", choices=[True, False], default=True ), ] problem: MultiskillRcpspProblem def __init__(self, problem: Problem, **kwargs: Any): super().__init__(problem, **kwargs) self.variables = {}
[docs] def set_lexico_objective(self, obj: str) -> None: self.cp_model.Minimize(self.variables[obj])
[docs] def add_lexico_constraint(self, obj: str, value: float) -> Iterable[Constraint]: return [self.cp_model.Add(self.variables[obj] <= int(value))]
[docs] @staticmethod def implements_lexico_api() -> bool: return True
[docs] def get_lexico_objectives_available(self) -> List[str]: return ["makespan"]
# return ["makespan", "cost"]
[docs] def get_lexico_objective_value(self, obj: str, res: ResultStorage) -> float: return min(s._internal_obj[obj] for s, _ in res.list_solution_fits)
[docs] def init_model(self, **args: Any) -> None: args = self.complete_with_default_hyperparameters(args) one_worker_per_task = args.get("one_worker_per_task", False) one_skill_per_task = args.get("one_skill_per_task", False) redundant_skill_cumulative = args["redundant_skill_cumulative"] redundant_worker_cumulative = args["redundant_worker_cumulative"] self.cp_model = CpModel() self.variables = {} self.create_base_variable() self.create_opt_variable_modes() self.create_employee_intervals( one_worker_per_task=one_worker_per_task, one_skill_per_task=one_skill_per_task, ) self.create_skills_variables() self.fake_tasks, self.fake_tasks_unit = create_fake_tasks_multiskills( self.problem ) self.create_constraint_resource() if redundant_skill_cumulative: self.constraint_redundant_cumulative_skills() if redundant_worker_cumulative: self.constraint_redundant_cumulative_worker() self.create_disjunctive_worker() self.create_skills_constraint_to_mode() self.create_skills_constraint_worker(**args) self.create_skills_constraints_v2(**args) self.constraint_precedence() self.variables["makespan"] = self.variables["base_variable"]["ends"][ self.problem.sink_task ] self.create_workload_variables() self.cp_model.Minimize(self.variables["makespan"])
[docs] def create_base_variable(self): start_var = {} end_var = {} duration_var = {} interval_var = {} for task in self.problem.tasks_list: possible_duration = [ self.problem.mode_details[task][m]["duration"] for m in self.problem.mode_details[task] ] start_var[task] = self.cp_model.NewIntVar( lb=0, ub=self.problem.horizon, name=f"start_{task}" ) end_var[task] = self.cp_model.NewIntVar( lb=0, ub=self.problem.horizon, name=f"end_{task}" ) duration_var[task] = self.cp_model.NewIntVarFromDomain( domain=Domain.FromValues(possible_duration), name=f"duration_{task}" ) interval_var[task] = self.cp_model.NewIntervalVar( start=start_var[task], size=duration_var[task], end=end_var[task], name=f"interval_{task}", ) self.variables["base_variable"] = { "starts": start_var, "ends": end_var, "durations": duration_var, "intervals": interval_var, }
[docs] def create_opt_variable_modes(self): if not self.problem.is_multimode: self.variables["mode_variable"] = {"is_present": {}, "opt_intervals": {}} return opt_interval_var = {} is_present_var = {} for task in self.problem.tasks_list: modes = list(self.problem.mode_details[task]) if len(modes) == 1: continue is_present_var[task] = {} opt_interval_var[task] = {} for mode in modes: is_present_var[task][mode] = self.cp_model.NewBoolVar( name=f"{task}_{mode}" ) opt_interval_var[task][mode] = self.cp_model.NewOptionalIntervalVar( start=self.variables["base_variable"]["starts"][task], size=self.problem.mode_details[task][mode]["duration"], end=self.variables["base_variable"]["ends"][task], is_present=is_present_var[task][mode], name=f"opt_{task}_{mode}", ) self.variables["mode_variable"] = { "is_present": is_present_var, "opt_intervals": opt_interval_var, }
[docs] def create_employee_intervals( self, one_worker_per_task: bool, one_skill_per_task: bool ): opt_interval_var = {} is_present_var = {} skills_used_var = {} employees_per_skill = {} for s in self.problem.skills_set: employees_per_skill[s] = { e for e in self.problem.employees if s in self.problem.employees[e].get_non_zero_skills() } for task in self.problem.tasks_list: skills_of_task = set() for mode in self.problem.mode_details[task]: for skill in self.problem.skills_set: if self.problem.mode_details[task][mode].get(skill, 0) > 0: skills_of_task.add(skill) if len(skills_of_task) == 0: # no need of employees continue is_present_var[task] = {} opt_interval_var[task] = {} skills_used_var[task] = {} for worker in self.problem.employees: if ( one_worker_per_task and any( all( self.problem.employees[worker].get_skill_level(s) >= self.problem.mode_details[task][mode].get(s, 0) for s in skills_of_task ) for mode in self.problem.mode_details[task] ) ) or any(worker in employees_per_skill[s] for s in skills_of_task): skills_used_var[task][worker] = {} is_present_var[task][worker] = self.cp_model.NewBoolVar( name=f"used_{task}_{worker}" ) opt_interval_var[task][ worker ] = self.cp_model.NewOptionalIntervalVar( start=self.variables["base_variable"]["starts"][task], size=self.variables["base_variable"]["durations"][task], end=self.variables["base_variable"]["ends"][task], is_present=is_present_var[task][worker], name=f"opt_{task}_{worker}", ) skills_of_worker = self.problem.employees[ worker ].get_non_zero_skills() for s in skills_of_task: if s not in skills_of_worker: # skills_used_var[task][worker][s] = 0 continue else: if not one_skill_per_task or len(skills_of_worker) == 1: skills_used_var[task][worker][s] = is_present_var[task][ worker ] else: skills_used_var[task][worker][ s ] = self.cp_model.NewBoolVar( name=f"skill_{task}_{worker}_{s}" ) for s in skills_used_var[task][worker]: self.cp_model.Add( skills_used_var[task][worker][s] <= is_present_var[task][worker] ) self.cp_model.AddBoolOr( [ skills_used_var[task][worker][s] for s in skills_used_var[task][worker] ] ).OnlyEnforceIf(is_present_var[task][worker]) if one_skill_per_task: if len(skills_used_var[task][worker]) >= 1: self.cp_model.AddAtMostOne( [ skills_used_var[task][worker][s] for s in skills_used_var[task][worker] ] ) if one_worker_per_task: self.cp_model.AddAtMostOne( [is_present_var[task][worker] for worker in is_present_var[task]] ) self.variables["worker_variable"] = { "is_present": is_present_var, "opt_intervals": opt_interval_var, "skills_used": skills_used_var, }
[docs] def create_skills_variables(self): skills_var = {} for task in self.problem.tasks_list: skills_of_task = set() for mode in self.problem.mode_details[task]: for skill in self.problem.skills_set: if self.problem.mode_details[task][mode].get(skill, 0) > 0: skills_of_task.add(skill) skills_var[task] = {} for s in skills_of_task: skills_var[task][s] = self.cp_model.NewIntVar( lb=min( [ self.problem.mode_details[task][m].get(s, 0) for m in self.problem.mode_details[task] ] ), ub=max( [ self.problem.mode_details[task][m].get(s, 0) for m in self.problem.mode_details[task] ] ), name=f"skills_{task}_{s}", ) self.variables["skills_req"] = skills_var
[docs] def create_workload_variables(self): workload = {} for emp in self.problem.employees: tasks = { task for task in self.variables["worker_variable"]["is_present"] if emp in self.variables["worker_variable"]["is_present"][task] } workload[emp] = sum( [ self.problem.mode_details[t][1]["duration"] * self.variables["worker_variable"]["is_present"][t][emp] for t in tasks ] ) max_workload = self.cp_model.NewIntVar( lb=0, ub=self.problem.horizon, name=f"max_workload" ) min_workload = self.cp_model.NewIntVar( lb=0, ub=self.problem.horizon, name=f"min_workload" ) self.cp_model.AddMaxEquality(max_workload, [workload[emp] for emp in workload]) self.cp_model.AddMinEquality(min_workload, [workload[emp] for emp in workload]) self.variables["max_workload"] = max_workload self.variables["min_workload"] = min_workload
[docs] def create_skills_constraint_to_mode(self): for task in self.variables["skills_req"]: if task in self.variables["mode_variable"]["is_present"]: for mode in self.variables["mode_variable"]["is_present"][task]: for s in self.variables["skills_req"][task]: val = self.problem.mode_details[task][mode].get(s, 0) self.cp_model.Add( self.variables["skills_req"][task][s] == val ).OnlyEnforceIf( self.variables["mode_variable"]["is_present"][task][mode] )
[docs] def create_skills_constraint_worker(self, **args): exact_skill = args.get("exact_skill", False) slack_skill = args.get("slack_skill", False) if slack_skill: slack_skill_dict = {} for task in self.variables["skills_req"]: if slack_skill: slack_skill_dict[task] = {} for s in self.variables["skills_req"][task]: if slack_skill: slack_skill_dict[task][s] = self.cp_model.NewIntVar( lb=0, ub=5, name=f"slack_{task}_{s}" ) terms = [] weights = [] for worker in self.variables["worker_variable"]["is_present"][task]: skill_value = self.problem.employees[worker].get_skill_level(s) if skill_value != 0: terms.append( self.variables["worker_variable"]["is_present"][task][ worker ] ) weights.append(skill_value) if exact_skill: if not slack_skill: self.cp_model.Add( LinearExpr.weighted_sum(terms, weights) == self.variables["skills_req"][task][s] ) else: self.cp_model.Add( LinearExpr.weighted_sum(terms, weights) == self.variables["skills_req"][task][s] + slack_skill_dict[task][s] ) else: if not slack_skill: self.cp_model.Add( LinearExpr.weighted_sum(terms, weights) >= self.variables["skills_req"][task][s] ) else: self.cp_model.Add( LinearExpr.weighted_sum(terms, weights) >= self.variables["skills_req"][task][s] + slack_skill_dict[task][s] ) if slack_skill: self.variables["slack_skill_var"] = slack_skill_dict
[docs] def create_skills_constraints_v2(self, **args): """ using skills_used variable """ exact_skill = args.get("exact_skill", False) slack_skill = args.get("slack_skill", False) if slack_skill: slack_skill_dict = {} for task in self.variables["skills_req"]: if slack_skill: slack_skill_dict[task] = {} for s in self.variables["skills_req"][task]: if slack_skill: slack_skill_dict[task][s] = self.cp_model.NewIntVar( lb=0, ub=5, name=f"slack_{task}_{s}" ) terms = [] weights = [] for worker in self.variables["worker_variable"]["is_present"][task]: if ( s in self.variables["worker_variable"]["skills_used"][task][ worker ] ): terms.append( self.variables["worker_variable"]["skills_used"][task][ worker ][s] ) weights.append( self.problem.employees[worker].get_skill_level(s) ) if exact_skill: if not slack_skill: self.cp_model.Add( LinearExpr.weighted_sum(terms, weights) == self.variables["skills_req"][task][s] ) else: self.cp_model.Add( LinearExpr.weighted_sum(terms, weights) == self.variables["skills_req"][task][s] + slack_skill_dict[task][s] ) else: if not slack_skill: self.cp_model.Add( LinearExpr.weighted_sum(terms, weights) >= self.variables["skills_req"][task][s] ) else: self.cp_model.Add( LinearExpr.weighted_sum(terms, weights) >= self.variables["skills_req"][task][s] + slack_skill_dict[task][s] ) if slack_skill: self.variables["slack_skill_var"] = slack_skill_dict
[docs] def constraint_precedence(self): for task in self.problem.successors: for succ in self.problem.successors[task]: self.cp_model.Add( self.variables["base_variable"]["starts"][succ] >= self.variables["base_variable"]["ends"][task] )
[docs] def create_constraint_resource(self): for r in self.problem.resources_list: if r in self.problem.non_renewable_resources: self.create_non_renewable_res_constraint(r) else: self.create_cumulative_resource_constraint(r)
[docs] def create_non_renewable_res_constraint(self, res: str): vars_consume = [] for task in self.problem.tasks_list: modes = list(self.problem.mode_details[task].keys()) if len(modes) == 1: if self.problem.mode_details[task][modes[0]].get(res, 0) > 0: vars_consume.append( (1, self.problem.mode_details[task][modes[0]][res]) ) else: for mode in modes: if self.problem.mode_details[task][mode].get(res, 0) > 0: vars_consume.append( ( self.variables["mode_variable"]["is_present"][task][ mode ], self.problem.mode_details[task][mode][res], ) ) self.cp_model.Add( LinearExpr.weighted_sum( [x[0] for x in vars_consume], [x[1] for x in vars_consume] ) <= self.problem.get_max_resource_capacity(res) )
[docs] def create_cumulative_resource_constraint(self, res: str): intervals_consume = [] for task in self.problem.tasks_list: modes = list(self.problem.mode_details[task].keys()) if len(modes) == 1: if self.problem.mode_details[task][modes[0]].get(res, 0) > 0: intervals_consume.append( ( self.variables["base_variable"]["intervals"][task], self.problem.mode_details[task][modes[0]][res], ) ) else: for mode in modes: if self.problem.mode_details[task][mode].get(res, 0) > 0: intervals_consume.append( ( self.variables["mode_variable"]["opt_intervals"][task][ mode ], self.problem.mode_details[task][mode][res], ) ) calendar_tasks = [ ( self.cp_model.NewFixedSizeIntervalVar( start=f["start"], size=f["duration"], name="calendar_res" ), f.get(res, 0), ) for f in self.fake_tasks if f.get(res, 0) > 0 ] self.cp_model.AddCumulative( [x[0] for x in intervals_consume] + [x[0] for x in calendar_tasks], [x[1] for x in intervals_consume] + [x[1] for x in calendar_tasks], capacity=self.problem.get_max_resource_capacity(res), )
[docs] def create_disjunctive_worker(self): for worker in self.problem.employees: intervals_consume = [] for task in self.variables["worker_variable"]["opt_intervals"]: if worker in self.variables["worker_variable"]["opt_intervals"][task]: intervals_consume.append( ( self.variables["worker_variable"]["opt_intervals"][task][ worker ], 1, ) ) calendar_tasks = [ ( self.cp_model.NewFixedSizeIntervalVar( start=f["start"], size=f["duration"], name="calendar_res" ), f.get(worker, 0), ) for f in self.fake_tasks_unit if f.get(worker, 0) > 0 ] self.cp_model.AddCumulative( [x[0] for x in intervals_consume] + [x[0] for x in calendar_tasks], [x[1] for x in intervals_consume] + [x[1] for x in calendar_tasks], capacity=1, )
[docs] def constraint_redundant_cumulative_skills(self): discr_calendar, dict_calendar_skills = compute_discretize_calendar_skills( problem=self.problem ) for skill in self.problem.skills_set: intervals_consume = [] for task in self.problem.tasks_list: modes = list(self.problem.mode_details[task].keys()) if len(modes) == 1: if self.problem.mode_details[task][modes[0]].get(skill, 0) > 0: intervals_consume.append( ( self.variables["base_variable"]["intervals"][task], self.problem.mode_details[task][modes[0]][skill], ) ) else: for mode in modes: if self.problem.mode_details[task][mode].get(skill, 0) > 0: intervals_consume.append( ( self.variables["mode_variable"]["opt_intervals"][ task ][mode], self.problem.mode_details[task][mode][skill], ) ) calendar_tasks = [ ( self.cp_model.NewFixedSizeIntervalVar( start=f["start"], size=f["duration"], name="calendar_res" ), f.get("value", 0), ) for f in discr_calendar[skill] if f.get("value", 0) > 0 ] self.cp_model.AddCumulative( [x[0] for x in intervals_consume] + [x[0] for x in calendar_tasks], [x[1] for x in intervals_consume] + [x[1] for x in calendar_tasks], capacity=int(np.max(dict_calendar_skills[skill])), )
[docs] def constraint_redundant_cumulative_worker(self): some_employee = next(emp for emp in self.problem.employees) len_calendar = len(self.problem.employees[some_employee].calendar_employee) merged_calendar = np.zeros(len_calendar) for emp in self.problem.employees: merged_calendar += np.array(self.problem.employees[emp].calendar_employee) discr_calendar = discretize_calendar_(merged_calendar) intervals_consume = [] max_skill_over_worker = {s: 0 for s in self.problem.skills_set} for emp in self.problem.employees: for s in self.problem.skills_set: max_skill_over_worker[s] = max( max_skill_over_worker[s], self.problem.employees[emp].get_skill_level(s), ) for task in self.problem.tasks_list: modes = list(self.problem.mode_details[task].keys()) if len(modes) == 1: skills_needed = { s: self.problem.mode_details[task][modes[0]].get(s, 0) for s in self.problem.skills_set if self.problem.mode_details[task][modes[0]].get(s, 0) > 0 } if len(skills_needed) > 0: lb_nb_worker_needed = max( [ int(math.ceil(skills_needed[s] / max_skill_over_worker[s])) for s in skills_needed ] ) intervals_consume.append( ( self.variables["base_variable"]["intervals"][task], lb_nb_worker_needed, ) ) else: for mode in modes: skills_needed = { s: self.problem.mode_details[task][modes[0]].get(s, 0) for s in self.problem.skills_set if self.problem.mode_details[task][modes[0]].get(s, 0) > 0 } if len(skills_needed) > 0: lb_nb_worker_needed = max( [ int( math.ceil( skills_needed[s] / max_skill_over_worker[s] ) ) for s in skills_needed ] ) intervals_consume.append( ( self.variables["mode_variable"]["opt_intervals"][task][ mode ], lb_nb_worker_needed, ) ) calendar_tasks = [ ( self.cp_model.NewFixedSizeIntervalVar( start=f["start"], size=f["duration"], name="calendar_res" ), f.get("value", 0), ) for f in discr_calendar if f.get("value", 0) > 0 ] self.cp_model.AddCumulative( [x[0] for x in intervals_consume] + [x[0] for x in calendar_tasks], [x[1] for x in intervals_consume] + [x[1] for x in calendar_tasks], capacity=self.problem.nb_employees, )
[docs] def constraint_mode(self): for task in self.variables["mode_variable"]["is_present"]: self.cp_model.AddExactlyOne( [ self.variables["mode_variable"]["is_present"][task][m] for m in self.variables["mode_variable"]["is_present"][task] ] )
[docs] def create_cost_objective_function(self): max_salary = max( self.problem.employees[x].salary for x in self.problem.employees ) cost_per_tasks = { task: self.cp_model.NewIntVar( lb=0, ub=int( 10 * max_salary * max( self.problem.mode_details[task][m]["duration"] for m in self.problem.mode_details[task] ) ), name=f"cost_{task}", ) for task in self.problem.tasks_list } for task in self.problem.tasks_list: modes = list(self.problem.mode_details[task].keys()) if len(modes) == 1: dur = self.problem.mode_details[task][modes[0]]["duration"] if task not in self.variables["worker_variable"]["is_present"]: self.cp_model.Add(cost_per_tasks[task] == 0) else: workers = [ w for w in self.variables["worker_variable"]["is_present"][task] ] self.cp_model.Add( LinearExpr.weighted_sum( [ self.variables["worker_variable"]["is_present"][task][w] for w in self.variables["worker_variable"][ "is_present" ][task] ], [ dur * int(10 * self.problem.employees[w].salary) for w in workers ], ) == cost_per_tasks[task] ) else: workers = [ w for w in self.variables["worker_variable"]["is_present"][task] ] self.cp_model.AddMultiplicationEquality( cost_per_tasks[task], [ self.variables["base_variable"]["durations"][task], LinearExpr.weighted_sum( [ self.variables["worker_variable"]["is_present"][task][w] for w in self.variables["worker_variable"][ "is_present" ][task] ], [ int(10 * self.problem.employees[w].salary) for w in workers ], ), ], ) self.variables["cost"] = sum([cost_per_tasks[t] for t in cost_per_tasks])
[docs] def retrieve_solution(self, cpsolvercb: CpSolverSolutionCallback) -> Solution: logger.info( f"Current obj {cpsolvercb.ObjectiveValue()}, bound={cpsolvercb.BestObjectiveBound()}" ) modes_dict = {} schedule = {} employee_usage = {} for task in self.variables["base_variable"]["starts"]: schedule[task] = { "start_time": cpsolvercb.Value( self.variables["base_variable"]["starts"][task] ), "end_time": cpsolvercb.Value( self.variables["base_variable"]["ends"][task] ), } for task in self.problem.tasks_list: modes = list(self.problem.mode_details[task].keys()) if len(modes) == 1: modes_dict[task] = modes[0] else: for mode in self.variables["mode_variable"]["is_present"][task]: if cpsolvercb.Value( self.variables["mode_variable"]["is_present"][task][mode] ): modes_dict[task] = mode break for task in self.problem.tasks_list: skills_needed = set( [ s for s in self.problem.skills_set if self.problem.mode_details[task][modes_dict[task]].get(s, 0) > 0 ] ) employee_usage[task] = {} if task in self.variables["worker_variable"]["is_present"]: for worker in self.variables["worker_variable"]["is_present"][task]: if cpsolvercb.Value( self.variables["worker_variable"]["is_present"][task][worker] ): sk_nz = self.problem.employees[worker].get_non_zero_skills() if "skills_used" in self.variables["worker_variable"]: contrib = set() for s in self.variables["worker_variable"]["skills_used"][ task ][worker]: if cpsolvercb.Value( self.variables["worker_variable"]["skills_used"][ task ][worker][s] ): contrib.add(s) else: contrib = set(sk_nz).intersection(skills_needed) if len(contrib) > 0: employee_usage[task][worker] = contrib sol = MultiskillRcpspSolution( problem=self.problem, schedule=schedule, modes=modes_dict, employee_usage=employee_usage, ) sol._internal_obj = {} for k in self.get_lexico_objectives_available(): sol._internal_obj[k] = cpsolvercb.Value(self.variables[k]) return sol