Source code for discrete_optimization.shop.transformations.to_rcpsp_multimode
# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""Transformation from CommonShopProblem (JSP/FJSP/OSP) to multimode RCPSP."""
from typing import Optional
from discrete_optimization.generic_tools.transformation.problem_transformation import (
ProblemTransformation,
)
from discrete_optimization.rcpsp.problem import RcpspProblem
from discrete_optimization.rcpsp.solution import RcpspSolution
from discrete_optimization.shop.base import AnyShopSolution, CommonShopProblem
[docs]
class ShopToRcpspMultimodeTransformation(
ProblemTransformation[
CommonShopProblem, AnyShopSolution, RcpspProblem, RcpspSolution
]
):
"""Transform CommonShopProblem to multimode RCPSP.
This transformation works for JSP, FJSP, and OSP problems:
- JSP: Single mode per task (one recipe per subjob)
- FJSP: Multiple modes per task (multiple recipe options per subjob)
- OSP: Single mode per task, no precedence constraints
Mapping:
- (job_j, subjob_k) → task_{j}_{k}
- Recipe options → modes for each task
- Machines → renewable resources (capacity 1)
- Processing time on machine → duration for that mode
- Job precedence + no-overlap constraints → task successors and constraints
"""
def __init__(self):
"""Initialize transformation."""
self.task_to_tuple: dict[str, tuple[int, int]] = {}
self.tuple_to_task: dict[tuple[int, int], str] = {}
def _make_task_name(self, job: int, subjob: int) -> str:
"""Create task name from (job, subjob) indices."""
return f"task_{job}_{subjob}"
[docs]
def transform_problem(self, source_problem: CommonShopProblem) -> RcpspProblem:
"""Transform CommonShopProblem to multimode RCPSP.
Args:
source_problem: CommonShopProblem instance (JSP/FJSP/OSP)
Returns:
Equivalent multimode RCPSP problem
"""
# Build task name mappings
self.task_to_tuple = {}
self.tuple_to_task = {}
for job_idx, job in enumerate(source_problem.list_jobs):
for subjob_idx in range(source_problem.nb_subjob_per_job[job_idx]):
task_name = self._make_task_name(job_idx, subjob_idx)
self.task_to_tuple[task_name] = (job_idx, subjob_idx)
self.tuple_to_task[(job_idx, subjob_idx)] = task_name
# Create resources: one per machine (unary resources)
resources = {f"M{machine}": 1 for machine in range(source_problem.n_machines)}
# Get no-overlap sets (tasks within same job must not overlap)
no_overlap_sets = source_problem.get_no_overlap()
# Create a renewable resource for each no-overlap set
# Each resource has capacity 1, forcing tasks in the set to not overlap
no_overlap_resources = {}
for idx, task_set in enumerate(no_overlap_sets):
resource_name = f"NoOverlap_{idx}"
resources[resource_name] = 1
no_overlap_resources[resource_name] = task_set
# Build mode_details: task -> (mode -> resource requirements + duration)
mode_details = {}
# Add source and sink dummy tasks
source_task = "source"
sink_task = "sink"
mode_details[source_task] = {1: {"duration": 0}}
mode_details[sink_task] = {1: {"duration": 0}}
# Initialize resource requirements to 0 for all resources
for resource in resources:
mode_details[source_task][1][resource] = 0
mode_details[sink_task][1][resource] = 0
# Build mode_details for each task
for job_idx, job in enumerate(source_problem.list_jobs):
for subjob_idx, subjob in enumerate(job.subjobs):
task_name = self._make_task_name(job_idx, subjob_idx)
task_tuple = (job_idx, subjob_idx)
mode_details[task_name] = {}
# Each recipe option becomes a mode
for mode_idx, recipe in enumerate(subjob.recipes, start=1):
mode = {
"duration": recipe.processing_time,
}
# Initialize all machines to 0
for machine in range(source_problem.n_machines):
mode[f"M{machine}"] = 0
# Set the required machine to 1
mode[f"M{recipe.machine_index}"] = 1
# Add no-overlap resource consumption
# If this task belongs to a no-overlap set, it consumes the corresponding resource
for resource_name, task_set in no_overlap_resources.items():
if task_tuple in task_set:
mode[resource_name] = 1
else:
mode[resource_name] = 0
mode_details[task_name][mode_idx] = mode
# Build successors (precedence constraints)
successors = {source_task: [], sink_task: []}
# Get precedence constraints from the problem
precedence = source_problem.get_precedence_constraints()
# First, collect all first tasks of each job
first_tasks_of_jobs = set()
for job_idx, job in enumerate(source_problem.list_jobs):
if len(job.subjobs) > 0:
first_task = self._make_task_name(job_idx, 0)
first_tasks_of_jobs.add(first_task)
successors[source_task].append(first_task)
# Build precedence from the problem's get_precedence_constraints
for task, task_successors in precedence.items():
task_name = self.tuple_to_task[task]
if task_name not in successors:
successors[task_name] = []
for succ_task in task_successors:
succ_task_name = self.tuple_to_task[succ_task]
successors[task_name].append(succ_task_name)
# Handle tasks with no successors -> they lead to sink
for job_idx, job in enumerate(source_problem.list_jobs):
for subjob_idx in range(len(job.subjobs)):
task_name = self._make_task_name(job_idx, subjob_idx)
if task_name not in successors:
successors[task_name] = []
# Check if this is a last task (no successors in precedence)
task_tuple = (job_idx, subjob_idx)
has_successor = False
if task_tuple in precedence:
if len(precedence[task_tuple]) > 0:
has_successor = True
if not has_successor:
# This is a terminal task, it should lead to sink
successors[task_name].append(sink_task)
return RcpspProblem(
resources=resources,
non_renewable_resources=[],
mode_details=mode_details,
successors=successors,
horizon=source_problem.horizon,
source_task=source_task,
sink_task=sink_task,
)
[docs]
def back_transform_solution(
self, solution: RcpspSolution, source_problem: CommonShopProblem
) -> AnyShopSolution:
"""Transform RCPSP solution back to CommonShopProblem solution.
Args:
solution: RCPSP solution
source_problem: Original CommonShopProblem
Returns:
Equivalent CommonShopProblem solution
"""
# Build shop schedule from RCPSP schedule
shop_schedule = [[None] * len(job.subjobs) for job in source_problem.list_jobs]
machine_index = [[None] * len(job.subjobs) for job in source_problem.list_jobs]
for task_name, task_details in solution.rcpsp_schedule.items():
if task_name in ["source", "sink"]:
continue
job_idx, subjob_idx = self.task_to_tuple[task_name]
start = task_details["start_time"]
end = task_details["end_time"]
# Get mode (which recipe option was chosen)
task_idx_non_dummy = solution.problem.index_task_non_dummy[task_name]
mode = solution.rcpsp_modes[task_idx_non_dummy]
option = mode - 1 # RCPSP modes are 1-indexed, shop options are 0-indexed
# Get machine from the chosen option
machine_id = (
source_problem.list_jobs[job_idx]
.subjobs[subjob_idx]
.recipes[option]
.machine_index
)
shop_schedule[job_idx][subjob_idx] = (start, end)
machine_index[job_idx][subjob_idx] = machine_id
return AnyShopSolution(
problem=source_problem,
schedule=shop_schedule,
machine_index=machine_index,
)
[docs]
def forward_transform_solution(
self, solution: AnyShopSolution, target_problem: RcpspProblem
) -> Optional[RcpspSolution]:
"""Transform CommonShopProblem solution to RCPSP solution (for warm-start).
Args:
solution: CommonShopProblem solution
target_problem: Target RCPSP problem
Returns:
Equivalent RCPSP solution for warm-start
"""
# Build RCPSP schedule from shop schedule
rcpsp_schedule = {
"source": {"start_time": 0, "end_time": 0},
"sink": {
"start_time": solution.get_max_end_time(),
"end_time": solution.get_max_end_time(),
},
}
rcpsp_modes = [1, 1] # source and sink use mode 1
# Convert shop schedule to RCPSP schedule
for job_idx, job_schedule in enumerate(solution.schedule):
for subjob_idx, (start, end) in enumerate(job_schedule):
task_name = self._make_task_name(job_idx, subjob_idx)
option = solution.get_mode((job_idx, subjob_idx))
rcpsp_schedule[task_name] = {
"start_time": start,
"end_time": end,
}
# Mode in RCPSP is 1-indexed (option + 1)
mode = option + 1
rcpsp_modes.append(mode)
return RcpspSolution(
problem=target_problem,
rcpsp_schedule=rcpsp_schedule,
rcpsp_modes=rcpsp_modes,
)