Source code for discrete_optimization.rcpsp_multiskill.solvers.multimode_transposition

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
from typing import Union

import numpy as np

from discrete_optimization.generic_tools.do_problem import ParamsObjectiveFunction
from discrete_optimization.generic_tools.do_solver import SolverDO
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.rcpsp.problem import RcpspProblem
from discrete_optimization.rcpsp.problem_preemptive import (
    PreemptiveRcpspProblem,
    PreemptiveRcpspSolution,
)
from discrete_optimization.rcpsp.solution import RcpspSolution
from discrete_optimization.rcpsp.solvers.cpsat import CpSatRcpspSolver
from discrete_optimization.rcpsp_multiskill.multiskill_to_rcpsp import MultiSkillToRcpsp
from discrete_optimization.rcpsp_multiskill.problem import (
    MultiskillRcpspProblem,
    MultiskillRcpspSolution,
    PreemptiveMultiskillRcpspSolution,
)
from discrete_optimization.rcpsp_multiskill.solvers.cp_mzn import (
    CpMultiskillRcpspSolver,
    CpPreemptiveMultiskillRcpspSolver,
    CpSolverName,
    stick_to_solution,
    stick_to_solution_preemptive,
)

logger = logging.getLogger(__name__)


[docs] class MultimodeTranspositionMultiskillRcpspSolver(SolverDO): problem: MultiskillRcpspProblem def __init__( self, problem: MultiskillRcpspProblem, multimode_problem: Union[RcpspProblem, PreemptiveRcpspProblem] = None, worker_type_to_worker: dict[str, set[Union[str, int]]] = None, params_objective_function: ParamsObjectiveFunction = None, solver_multimode_rcpsp: SolverDO = None, limit_number_of_mode_per_task: bool = True, max_number_of_mode: int = 3, check_resource_compliance: bool = True, reconstruction_cp_time_limit: int = 3600, **kwargs, ): """Initialize the multimode transposition solver. Args: problem: The multi-skill RCPSP problem to solve multimode_problem: Pre-computed multimode RCPSP problem (optional) worker_type_to_worker: Pre-computed worker type to worker mapping (optional) params_objective_function: Parameters for objective function solver_multimode_rcpsp: Solver for the multimode RCPSP problem limit_number_of_mode_per_task: Whether to limit modes per task in transformation max_number_of_mode: Maximum number of modes per task during transformation check_resource_compliance: Whether to check resource compliance during transformation reconstruction_cp_time_limit: Time limit (seconds) for CP-based reconstruction **kwargs: Additional arguments for parent class """ super().__init__( problem=problem, params_objective_function=params_objective_function ) self.multimode_problem = multimode_problem self.worker_type_to_worker = worker_type_to_worker self.solver_multimode_rcpsp = solver_multimode_rcpsp self.limit_number_of_mode_per_task = limit_number_of_mode_per_task self.max_number_of_mode = max_number_of_mode self.check_resource_compliance = check_resource_compliance self.reconstruction_cp_time_limit = reconstruction_cp_time_limit
[docs] def solve(self, **kwargs) -> ResultStorage: """Solve the multi-skill RCPSP using two-stage decomposition. The idea is to transform MS-RCPSP to multi-mode RCPSP by abstracting employees as worker types. Returns: ResultStorage containing the reconstructed multi-skill RCPSP solutions """ # Construct the multimode problem if not already given if self.multimode_problem is None or self.worker_type_to_worker is None: logger.info( "Construct RCPSP problem by abstracting employees as worker types" ) algo = MultiSkillToRcpsp(self.problem) rcpsp_problem = algo.construct_rcpsp_by_worker_type( limit_number_of_mode_per_task=self.limit_number_of_mode_per_task, max_number_of_mode=self.max_number_of_mode, check_resource_compliance=self.check_resource_compliance, ) logger.info(f"RCPSP Problem Info: {rcpsp_problem}") self.multimode_problem = rcpsp_problem self.worker_type_to_worker = algo.worker_type_to_worker if self.solver_multimode_rcpsp is None: # Create an underlying solver for the multimode RCPSP problem if not provided self.solver_multimode_rcpsp = CpSatRcpspSolver( problem=self.multimode_problem ) logger.info("Create an underlying solver for the multimode RCPSP problem") else: # Set the problem on the underlying solver if not already set if self.solver_multimode_rcpsp.problem is None: self.solver_multimode_rcpsp.problem = self.multimode_problem logger.info("Set the problem on the given underlying solver") # Solve the multimode problem logger.info("Solving the multi-mode RCPSP problem...") result_store = self.solver_multimode_rcpsp.solve(**kwargs) solution, fit = result_store.get_best_solution_fit() # Check if a solution was found if solution is None: logger.warning( "No solution found by the multi-mode RCPSP solver. Returning empty result store." ) return ResultStorage( list_solution_fits=[], mode_optim=self.problem.get_objective_register().objective_sense, ) logger.info(f"Multi-mode RCPSP solution obtained with objective/fit: {fit}") # Rebuild the solution for the multiskill problem using the solution of the rcpsp problem logger.info("Rebuild solution for the multiskill problem") res = rebuild_multiskill_solution_cp_based( multiskill_rcpsp_problem=self.problem, multimode_rcpsp_problem=self.multimode_problem, worker_type_to_worker=self.worker_type_to_worker, solution_rcpsp=solution, time_limit=self.reconstruction_cp_time_limit, ) return res
[docs] def rebuild_multiskill_solution( multiskill_rcpsp_problem: MultiskillRcpspProblem, multimode_rcpsp_problem: Union[RcpspProblem, PreemptiveRcpspProblem], worker_type_to_worker: dict[str, set[Union[str, int]]], solution_rcpsp: Union[RcpspSolution, PreemptiveRcpspSolution], ) -> Union[MultiskillRcpspSolution, PreemptiveMultiskillRcpspSolution]: """ This function takes the schedule from the RCPSP solution and rebuilds the solution for the multiskill problem. NOTE: need review, this function is currently not used. """ new_horizon = multimode_rcpsp_problem.horizon resource_avail_in_time = {} for res in multimode_rcpsp_problem.resources_list: if multimode_rcpsp_problem.is_varying_resource(): resource_avail_in_time[res] = multimode_rcpsp_problem.resources[res][ : new_horizon + 1 ] else: resource_avail_in_time[res] = np.full( new_horizon, multimode_rcpsp_problem.resources[res], dtype=np.int_ ).tolist() worker_avail_in_time = {} for i in multiskill_rcpsp_problem.employees: worker_avail_in_time[i] = np.array( multiskill_rcpsp_problem.employees[i].calendar_employee[: new_horizon + 1], dtype=np.bool_, ) rcpsp_schedule = solution_rcpsp.rcpsp_schedule employee_usage = {} modes_dict = multimode_rcpsp_problem.build_mode_dict(solution_rcpsp.rcpsp_modes) sorted_tasks = sorted(rcpsp_schedule, key=lambda x: solution_rcpsp.get_end_time(x)) for task in sorted_tasks: employee_usage[task] = {} ressource_requirements = multimode_rcpsp_problem.mode_details[task][ modes_dict[task] ] non_zeros_ressource_requirements = set( [ k for k in ressource_requirements if k in worker_type_to_worker and ressource_requirements[k] > 0 ] ) if len(non_zeros_ressource_requirements) >= 1: active_times = solution_rcpsp.get_active_time(task) for k in non_zeros_ressource_requirements: number_worker = ressource_requirements[k] workers_available = [ w for w in worker_type_to_worker[k] if all(worker_avail_in_time[w][i] for i in active_times) ] if len(workers_available) >= number_worker: wavail = workers_available[:number_worker] skills_needed_by_task = [ s for s in multiskill_rcpsp_problem.mode_details[task][1] if s in multiskill_rcpsp_problem.skills_set and multiskill_rcpsp_problem.mode_details[task][1][s] > 0 ] non_zeros = multiskill_rcpsp_problem.employees[ wavail[0] ].get_non_zero_skills() skills_interest = [ s for s in non_zeros if s in skills_needed_by_task ] employee_usage[task].update( {emp: set(skills_interest) for emp in wavail} ) for i in active_times: for w in wavail: worker_avail_in_time[w][i] = False else: if isinstance(solution_rcpsp, PreemptiveRcpspSolution): for s, e in zip( solution_rcpsp.rcpsp_schedule[task]["starts"], solution_rcpsp.rcpsp_schedule[task]["ends"], ): at = range(s, e) workers_available = [ w for w in worker_type_to_worker[k] if all(worker_avail_in_time[w][i] for i in at) ] if len(workers_available) >= number_worker: wavail = workers_available[:number_worker] skills_needed_by_task = [ s for s in multiskill_rcpsp_problem.mode_details[ task ][1] if s in multiskill_rcpsp_problem.skills_set and multiskill_rcpsp_problem.mode_details[task][1][ s ] > 0 ] non_zeros = multiskill_rcpsp_problem.employees[ wavail[0] ].get_non_zero_skills() skills_interest = [ s for s in non_zeros if s in skills_needed_by_task ] for emp in wavail: if emp not in employee_usage[task]: employee_usage[task][emp] = { "skills": [], "times": [], } employee_usage[task][emp]["skills"] += [ set(skills_interest) ] employee_usage[task][emp]["times"] += [(s, e)] for i in at: for w in wavail: worker_avail_in_time[w][i] = False else: logger.warning("Problem finding a worker") if isinstance(solution_rcpsp, PreemptiveRcpspSolution): return PreemptiveMultiskillRcpspSolution( problem=multiskill_rcpsp_problem, modes={task: 1 for task in multiskill_rcpsp_problem.tasks_list}, employee_usage=employee_usage, schedule=rcpsp_schedule, ) else: return MultiskillRcpspSolution( problem=multiskill_rcpsp_problem, modes={task: 1 for task in multiskill_rcpsp_problem.tasks_list}, employee_usage=employee_usage, schedule=rcpsp_schedule, )
[docs] def rebuild_multiskill_solution_cp_based( multiskill_rcpsp_problem: MultiskillRcpspProblem, multimode_rcpsp_problem: Union[ RcpspProblem, PreemptiveRcpspProblem ], # TODO: need review, currently unused worker_type_to_worker: dict[ str, set[Union[str, int]] ], # TODO: need review, currently unused solution_rcpsp: Union[RcpspSolution, PreemptiveRcpspSolution], time_limit: int = 3600, ) -> ResultStorage: """ Reconstructs a valid multi-skill RCPSP solution from the schedule from an RCPSP solution by assigning specific employees to tasks. Args: multiskill_rcpsp_problem: The multi-skill RCPSP problem multimode_rcpsp_problem: The multimode RCPSP problem (for reference) worker_type_to_worker: Mapping from worker types to employee sets solution_rcpsp: The RCPSP solution to constrain to time_limit: Time limit in seconds for CP solver Returns: ResultStorage: containing the reconstructed multi-skill RCPSP solutions """ if isinstance(solution_rcpsp, RcpspSolution): model = CpMultiskillRcpspSolver( problem=multiskill_rcpsp_problem, cp_solver_name=CpSolverName.CHUFFED ) model.init_model( add_calendar_constraint_unit=False, fake_tasks=True, one_ressource_per_task=False, exact_skills_need=False, output_type=True, ) # Constraint to stick to the solution of the RCPSP problem strings = stick_to_solution(solution_rcpsp, model) for s in strings: model.instance.add_string(s) else: model = CpPreemptiveMultiskillRcpspSolver( problem=multiskill_rcpsp_problem, cp_solver_name=CpSolverName.CHUFFED ) model.init_model( add_calendar_constraint_unit=False, fake_tasks=True, exact_skills_need=False, one_ressource_per_task=False, output_type=True, nb_preemptive=10, unit_usage_preemptive=True, max_preempted=100, ) # Constraint to stick to the solution of the RCPSP problem strings = stick_to_solution_preemptive(solution_rcpsp, model) for s in strings: model.instance.add_string(s) # Solve the CP model result_store = model.solve(time_limit=time_limit) return result_store