Source code for discrete_optimization.workforce.scheduling.transformations.to_multiskill

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

"""Transformation from Workforce Scheduling to Multiskill RCPSP.

This provides an alternative to the standard RCPSP transformation, mapping
teams directly to employees (workers) in multiskill RCPSP.
"""

from typing import Optional

import numpy as np

from discrete_optimization.generic_tools.transformation.problem_transformation import (
    ProblemTransformation,
)
from discrete_optimization.generic_tools.transformation.transformation_metadata import (
    InformationLoss,
    LossImpact,
    LossType,
    TransformationMetadata,
    lossy_transformation,
)
from discrete_optimization.rcpsp_multiskill.problem import (
    Employee,
    MultiskillRcpspProblem,
    MultiskillRcpspSolution,
    SkillDetail,
    SpecialConstraintsDescription,
)
from discrete_optimization.workforce.scheduling.problem import (
    AllocSchedulingProblem,
    AllocSchedulingSolution,
)


[docs] class WorkforceSchedulingToMultiskillTransformation( ProblemTransformation[ AllocSchedulingProblem, AllocSchedulingSolution, MultiskillRcpspProblem, MultiskillRcpspSolution, ] ): """Transform Workforce Scheduling to Multiskill RCPSP. Mapping: - Tasks → Tasks - Teams → Employees/Workers (dict indexed by team name) - Team availability → Employee calendars - Task duration → Task duration - Precedence → Task successors - Available teams for activity → Skills (ONE skill per unique eligibility pattern) Skill Mapping Strategy: Each unique set of eligible teams gets ONE skill. All teams in that set possess that skill. When a task requires that skill, ANY employee (team) with the skill can perform it. This correctly models "task needs one team from eligible set". Example: - Task A can be done by Team1 OR Team2 → Skill_A - Task B can be done by Team1 OR Team2 → Skill_A (same eligibility) - Task C can be done by Team3 → Skill_C - Team1 and Team2 have Skill_A; Team3 has Skill_C This transformation is LOSSY: - Same_allocation constraints are approximated - Cumulative resource consumption may be lost """
[docs] def get_forward_metadata(self) -> TransformationMetadata: """Metadata for forward problem transformation (WorkforceScheduling → MultiskillRCPSP). This direction is LOSSY but provides access to multiskill RCPSP solvers. """ losses = [ InformationLoss( name="same_allocation_constraints", loss_type=LossType.CONSTRAINT, description="Tasks that must be assigned to the same team", reason="Multiskill RCPSP has no built-in same-employee constraint", impact=LossImpact.MODERATE, workaround="Can be approximated with additional constraints or post-processing", ), InformationLoss( name="cumulative_resources", loss_type=LossType.CONSTRAINT, description="Cumulative resource consumption beyond team assignment", reason="Direct mapping focuses on employee (team) assignment", impact=LossImpact.MINOR, workaround="Use standard RCPSP transformation for full resource modeling", ), ] return lossy_transformation( losses=losses, assumptions=[ "Teams map directly to employees/workers", "One skill per task (based on team eligibility)", "Precedence constraints preserved", "Time windows preserved", ], use_cases=[ "Workforce problems with team-based allocation", "Access to multiskill RCPSP solvers", "Employee scheduling with skill requirements", ], warnings=[ "Same_allocation constraints not enforced", "Verify solution feasibility in original problem", ], )
[docs] def transform_problem( self, source_problem: AllocSchedulingProblem ) -> MultiskillRcpspProblem: """Transform Workforce Scheduling to Multiskill RCPSP. Args: source_problem: AllocSchedulingProblem instance Returns: Equivalent MultiskillRcpspProblem """ # Create employees from teams employees = {} for i, team in enumerate(source_problem.team_names): # Get calendar for this team employee = Employee( dict_skill={}, # Will add skills based on tasks calendar_employee=source_problem.get_resource_calendar( team, source_problem.horizon ), ) employees[team] = employee # Use team name as employee ID # Create tasks with skill requirements # Strategy: Create ONE skill per unique set of eligible teams # All employees in that set have that skill # Task requires only that ONE skill (meaning any employee with it can do the task) tasks = {} mode_details = {} skills_set = set() resources = source_problem.resources_list # Map from frozenset of team indices to skill name eligibility_to_skill = {} for task_idx, task in enumerate(source_problem.tasks_list): task_desc = source_problem.tasks_data[task] duration = task_desc.duration_task # Get eligible teams for this task eligible_teams = source_problem.available_team_for_activity.get(task, set()) # If no eligible teams, make all teams eligible if not eligible_teams: eligible_teams = set(source_problem.team_names) # Create a skill for this eligibility pattern (use team names, not indices) eligibility_key = frozenset(eligible_teams) if eligibility_key not in eligibility_to_skill: # Create new skill for this eligibility pattern skill_name = f"skill_{len(eligibility_to_skill)}" eligibility_to_skill[eligibility_key] = skill_name skills_set.add(skill_name) # Add this skill to all employees (teams) in the eligibility set for team in eligibility_key: if ( team in employees and skill_name not in employees[team].dict_skill ): employees[team].dict_skill[skill_name] = SkillDetail( skill_value=1, efficiency_ratio=1.0, experience=0.0 ) # Get the skill for this task skill_name = eligibility_to_skill[eligibility_key] # Task requires only this ONE skill # Any employee with this skill can perform the task required_skills = {skill_name: 1} # Get successors from precedence constraints successors = source_problem.precedence_constraints.get(task, set()) tasks[task] = { "duration": duration, "skills": required_skills, "successors": list(successors), } # Create mode (single mode per task) mode_details[task] = {1: {"duration": duration}} mode_details[task][1].update(required_skills) for r in task_desc.resource_consumption: if task_desc.resource_consumption[r] > 0: mode_details[task][1][r] = task_desc.resource_consumption[r] # Build skills dictionary skills = {skill: None for skill in skills_set} # Create multiskill RCPSP problem normal_tasks = list(mode_details.keys()) source_task = "source_ms" sink_task = "sink_ms" mode_details[source_task] = {1: {"duration": 0}} mode_details[sink_task] = {1: {"duration": 0}} tasks[source_task] = {"successors": normal_tasks} tasks[sink_task] = {"successors": []} for t in normal_tasks: tasks[t]["successors"].append(sink_task) start_times_window = {} end_times_window = {} for t in source_problem.start_window: start_times_window[t] = source_problem.start_window[t] end_times_window[t] = source_problem.end_window[t] special_constraints = SpecialConstraintsDescription( start_times_window=start_times_window, end_times_window=end_times_window ) return MultiskillRcpspProblem( skills_set=set(skills.keys()), resources_set=set( source_problem.resources_list ), # No non-renewable resources non_renewable_resources=set(), tasks_list=[source_task] + normal_tasks + [sink_task], resources_availability={ r: source_problem.get_resource_calendar(r, source_problem.horizon) for r in source_problem.resources_list }, special_constraints=special_constraints, employees=employees, mode_details=mode_details, successors={task: tasks[task]["successors"] for task in tasks}, horizon=source_problem.horizon, sink_task=sink_task, source_task=source_task, )
[docs] def back_transform_solution( self, solution: MultiskillRcpspSolution, source_problem: AllocSchedulingProblem, ) -> AllocSchedulingSolution: """Transform Multiskill RCPSP solution back to Workforce Scheduling. Args: solution: MultiskillRcpspSolution source_problem: Original AllocSchedulingProblem Returns: Equivalent AllocSchedulingSolution """ n_tasks = len(source_problem.tasks_list) # Create schedule array schedule = np.zeros((n_tasks, 2), dtype=int) # Create allocation array allocation = np.zeros(n_tasks, dtype=int) for task_idx, task in enumerate(source_problem.tasks_list): # Get start time start = solution.schedule.get(task, {}).get("start_time", 0) end = solution.schedule.get(task, {}).get("end_time", 0) schedule[task_idx, 0] = start schedule[task_idx, 1] = end # Get employee assignment (employee ID is team name) employee_id_keys = solution.employee_usage.get(task, {}) if len(employee_id_keys) > 0: emp_id = list(employee_id_keys)[0] allocation[task_idx] = source_problem.teams_to_index[emp_id] else: allocation[task_idx] = 0 # Default to first team return AllocSchedulingSolution( problem=source_problem, schedule=schedule, allocation=allocation, )
[docs] def forward_transform_solution( self, solution: AllocSchedulingSolution, target_problem: MultiskillRcpspProblem, ) -> Optional[MultiskillRcpspSolution]: """Transform Workforce Scheduling solution to Multiskill RCPSP (for warmstart). Args: solution: AllocSchedulingSolution target_problem: Target MultiskillRcpspProblem Returns: Equivalent MultiskillRcpspSolution """ schedule = {} employee_usage = {} for task_idx, task in enumerate(solution.problem.tasks_list): start = int(solution.schedule[task_idx, 0]) end = int(solution.schedule[task_idx, 1]) team_idx = int(solution.allocation[task_idx]) # Convert team index to team name team_name = solution.problem.index_to_team.get( team_idx, solution.problem.team_names[0] ) schedule[task] = { "start_time": start, "end_time": end, } # Employee usage (mode 1) - employee ID is team name employee_usage[task] = {1: team_name} return MultiskillRcpspSolution( problem=target_problem, schedule=schedule, employee_usage=employee_usage, modes={task: 1 for task in schedule.keys()}, # All mode 1 )