# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""DIDPpy-based solver for the Oven Scheduling Problem."""
import re
from typing import Any
import didppy as dp
from discrete_optimization.generic_tools.do_problem import Solution
from discrete_optimization.generic_tools.dyn_prog_tools import DpSolver
from discrete_optimization.ovensched.problem import (
OvenSchedulingProblem,
OvenSchedulingSolution,
ScheduleInfo,
)
[docs]
class DpOvenSchedulingSolver(DpSolver):
"""
Dynamic Programming solver for the Oven Scheduling Problem using DIDPpy.
This solver properly models batching by:
- Tracking current open batch on each machine
- Managing batch duration (fixed or dynamic based on allow_batch_duration_update)
- Checking duration compatibility when adding subsequent jobs to a batch
- Two transition types: add to batch vs. close batch and start new one
- Respecting machine availability windows
- Using weighted multi-objective cost function (tardiness + processing + setup)
- Optional dual bounds to guide search (use_dual_bounds=True)
- Optional transition dominance rules (use_dominance=True)
Batch duration models:
- Fixed (allow_batch_duration_update=False, default): Duration is set to first job's
min_duration and never changes. Subsequent jobs must fit within this duration.
- Dynamic (allow_batch_duration_update=True): Duration can increase when adding jobs
with larger min_duration, up to the minimum max_duration of all jobs in batch.
More flexible but may slightly misevaluate tardiness.
The objective function correctly includes:
- Tardiness costs (paid when each job is added to a batch)
- Setup costs (paid when closing a batch and starting a new one)
- Processing costs (paid when closing a batch + at base case for final batches)
Transition dominance rules (3 rules when use_dominance=True):
1. Add to batch > Close and start new (when job fits: avoids setup costs)
2. Non-late job > Late job (when same attribute: prioritizes on-time delivery)
3. Tighter deadline > Looser deadline (when both on time: reduces future conflicts)
"""
problem: OvenSchedulingProblem
def _get_objective_weights(self) -> tuple[int, int, int]:
"""Extract objective weights from params_objective_function."""
weight_tardiness = 1
weight_processing = 1
weight_setup = 1
if self.params_objective_function is not None:
for i, obj_name in enumerate(self.params_objective_function.objectives):
if obj_name == "nb_late_jobs":
weight_tardiness = self.params_objective_function.weights[i]
elif obj_name == "processing_time":
weight_processing = self.params_objective_function.weights[i]
elif obj_name == "setup_cost":
weight_setup = self.params_objective_function.weights[i]
return int(weight_tardiness), int(weight_processing), int(weight_setup)
[docs]
def init_model(self, **kwargs: Any) -> None:
"""Initialize the DIDPpy model with proper batching support.
Args:
use_dual_bounds: If True, add dual bounds to guide search (default: False)
use_dominance: If True, add transition dominance rules (default: True)
allow_batch_duration_update: If True, allow batch duration to increase when adding
jobs with larger min_duration (default: False). This provides more flexibility
but may slightly misevaluate tardiness.
"""
use_dual_bounds = kwargs.get("use_dual_bounds", False)
use_dominance = kwargs.get("use_dominance", True)
allow_batch_duration_update = kwargs.get("allow_batch_duration_update", False)
# Store for use in retrieve_solution
self._allow_batch_duration_update = allow_batch_duration_update
model = dp.Model()
n_jobs = self.problem.n_jobs
n_machines = self.problem.n_machines
# Get objective weights
weight_tardiness, weight_processing, weight_setup = (
self._get_objective_weights()
)
# Object types
job = model.add_object_type(number=n_jobs)
n_attributes = len(self.problem.setup_costs)
attribute = model.add_object_type(number=n_attributes)
# State variables
unscheduled = model.add_set_var(object_type=job, target=set(range(n_jobs)))
# For each machine, track:
current_time = [model.add_int_var(target=0) for _ in range(n_machines)]
# Current open batch
current_batch = [
model.add_set_var(object_type=job, target=set()) for _ in range(n_machines)
]
# Batch start time (fixed when batch is created with first task)
batch_start_time = [model.add_int_var(target=0) for _ in range(n_machines)]
# Batch duration (can be updated if allow_batch_duration_update=True)
batch_duration = [model.add_int_var(target=0) for _ in range(n_machines)]
# Batch max duration potential: minimum of max_durations of all jobs in batch
# This constrains how much we can increase the batch duration
# Initialized to large value (10000) when batch is empty
batch_max_duration_potential = [
model.add_int_var(target=10000) for _ in range(n_machines)
]
# Batch cumulative size
batch_size = [model.add_int_var(target=0) for _ in range(n_machines)]
# Batch attribute (when batch is empty, use last closed batch attribute)
batch_attribute = [
model.add_element_var(
object_type=attribute,
target=self.problem.machines_data[m].initial_attribute,
)
for m in range(n_machines)
]
# Store state variables as instance attributes for debugging/warmstart validation
self.unscheduled = unscheduled
self.current_batch = current_batch
self.batch_start_time = batch_start_time
self.batch_duration = batch_duration
self.batch_max_duration_potential = batch_max_duration_potential
self.batch_size = batch_size
self.batch_attribute = batch_attribute
self.current_time = current_time
# Lookup tables
job_attributes = [self.problem.tasks_data[j].attribute for j in range(n_jobs)]
job_min_durations = [
self.problem.tasks_data[j].min_duration for j in range(n_jobs)
]
job_max_durations = [
self.problem.tasks_data[j].max_duration for j in range(n_jobs)
]
job_earliest_starts = [
self.problem.tasks_data[j].earliest_start for j in range(n_jobs)
]
job_latest_ends = [self.problem.tasks_data[j].latest_end for j in range(n_jobs)]
job_sizes = [self.problem.tasks_data[j].size for j in range(n_jobs)]
min_duration_table = model.add_int_table(job_min_durations)
max_duration_table = model.add_int_table(job_max_durations)
earliest_start_table = model.add_int_table(job_earliest_starts)
latest_end_table = model.add_int_table(job_latest_ends)
size_table = model.add_int_table(job_sizes)
machine_capacities = [
self.problem.machines_data[m].capacity for m in range(n_machines)
]
capacity_table = model.add_int_table(machine_capacities)
setup_cost_table = model.add_int_table(self.problem.setup_costs)
setup_time_table = model.add_int_table(self.problem.setup_times)
self.transitions = {}
self.transition_ids = {} # Store transition IDs for dominance rules
# Sort jobs to bias transition order:
# 1. Earlier earliest_start first (jobs that need to start soon)
# 2. Larger min_duration first (to put them at the beginning of batches)
# jobs_sorted = sorted(
# range(n_jobs),
# key=lambda j: (job_earliest_starts[j], -job_min_durations[j])
# )
jobs_sorted = range(n_jobs)
# For each job and machine, create two types of transitions
# Process jobs in sorted order to bias search toward better decisions
for j in jobs_sorted:
eligible_machines = self.problem.tasks_data[j].eligible_machines
job_attr = job_attributes[j]
job_min_dur = job_min_durations[j]
job_max_dur = job_max_durations[j]
job_size = job_sizes[j]
for m in eligible_machines:
machine_capacity = machine_capacities[m]
# TRANSITION TYPE 1: Add job j to current open batch on machine m (same attribute)
# We create this transition only for jobs that could be added to a batch with matching attribute
# Since batch_attribute is a state variable, we create transitions for each possible attribute value
if allow_batch_duration_update:
# Dynamic batch duration model:
# - Batch duration can increase to accommodate job's min_duration
# - End time uses the potentially updated duration
new_batch_duration = dp.max(batch_duration[m], job_min_dur)
new_max_potential = dp.min(
batch_max_duration_potential[m], job_max_dur
)
end_time = batch_start_time[m] + new_batch_duration
# Compute tardiness cost
is_late = (end_time - latest_end_table[j] > 0).if_then_else(1, 0)
tardiness_cost = is_late * weight_tardiness
add_to_batch = dp.Transition(
name=f"add_j{j}_m{m}_attr{job_attr}",
cost=tardiness_cost + dp.IntExpr.state_cost(),
effects=[
(unscheduled, unscheduled.remove(j)),
(current_batch[m], current_batch[m].add(j)),
(batch_size[m], batch_size[m] + job_size),
# Update batch duration to max of current and new job's min_duration
(batch_duration[m], new_batch_duration),
# Update max duration potential to min of current and new job's max_duration
(batch_max_duration_potential[m], new_max_potential),
],
preconditions=[
unscheduled.contains(j),
~current_batch[m].is_empty(), # Batch must not be empty
batch_attribute[m]
== job_attr, # Check batch has this attribute
job_min_dur <= batch_max_duration_potential[m],
job_max_dur >= batch_duration[m],
# Duration compatibility: new batch duration must not exceed the new potential max
# This ensures: max(current_duration, job_min_dur) <= min(current_potential, job_max_dur)
# new_batch_duration <= new_max_potential,
# Time compatibility: job can start at the fixed batch start time
earliest_start_table[j] <= batch_start_time[m],
# Capacity: current batch size + new job size must fit
batch_size[m] + job_size <= machine_capacity,
],
)
else:
# Fixed batch duration model (original behavior):
# - Batch duration is fixed when batch is created
# - Jobs must fit within this fixed duration
end_time = batch_start_time[m] + batch_duration[m]
# Compute tardiness cost
is_late = (end_time - latest_end_table[j] > 0).if_then_else(1, 0)
tardiness_cost = is_late * weight_tardiness
add_to_batch = dp.Transition(
name=f"add_j{j}_m{m}_attr{job_attr}",
cost=tardiness_cost + dp.IntExpr.state_cost(),
effects=[
(unscheduled, unscheduled.remove(j)),
(current_batch[m], current_batch[m].add(j)),
(batch_size[m], batch_size[m] + job_size),
],
preconditions=[
unscheduled.contains(j),
~current_batch[m].is_empty(), # Batch must not be empty
batch_attribute[m]
== job_attr, # Check batch has this attribute
# Duration compatibility: job's duration range must contain the fixed batch duration
# i.e., job.min_duration <= batch_duration <= job.max_duration
job_min_dur <= batch_duration[m],
batch_duration[m] <= job_max_dur,
# Time compatibility: job can start at the fixed batch start time
earliest_start_table[j] <= batch_start_time[m],
# Capacity: current batch size + new job size must fit
batch_size[m] + job_size <= machine_capacity,
],
)
transition_id = model.add_transition(add_to_batch)
self.transitions[("add", j, m, job_attr)] = add_to_batch
self.transition_ids[("add", j, m, job_attr)] = transition_id
# TRANSITION TYPE 2: Close current batch and start new batch with job j
# This pays the setup cost and updates time
# For each availability slot
machine_avail = self.problem.machines_data[m].availability
for slot_idx, (avail_start, avail_end) in enumerate(machine_avail):
# Check static constraints before creating transition
if avail_end - avail_start < job_min_dur:
continue
if job_size > machine_capacity:
continue
setup_cost_expr = setup_cost_table[batch_attribute[m], job_attr]
setup_time_expr = setup_time_table[batch_attribute[m], job_attr]
# When closing a batch, we need to:
# 1. Pay processing cost for the closed batch (duration = batch_duration)
# 2. Pay setup cost for switching attributes
# 3. Start new batch with job j
# 4. Fix new batch duration = j.min_duration
# 5. Fix new batch start time in this availability window
# Batch duration when closing current batch
closed_batch_duration = batch_duration[m]
# Earliest we can start new batch after closing current one
# = batch_start_time + closed_batch_duration + setup_time
time_after_close = (
batch_start_time[m] + closed_batch_duration + setup_time_expr
)
# Earliest possible start for new batch in this availability window
earliest_possible = dp.max(
dp.max(time_after_close, earliest_start_table[j]), avail_start
)
# New batch duration = min_duration of first job (j)
new_batch_duration = job_min_dur
end_time = earliest_possible + new_batch_duration
# Compute tardiness cost for job j (first job of new batch)
is_late = (end_time - latest_end_table[j] > 0).if_then_else(1, 0)
tardiness_cost = is_late * weight_tardiness
# Cost = processing time of closed batch + setup cost + tardiness of new job
transition_cost = (
weight_processing * closed_batch_duration
+ weight_setup * setup_cost_expr
+ tardiness_cost
+ dp.IntExpr.state_cost()
)
close_and_start = dp.Transition(
name=f"close_start_j{j}_m{m}_slot{slot_idx}",
cost=transition_cost,
effects=[
(unscheduled, unscheduled.remove(j)),
(
current_batch[m],
model.create_set_const(object_type=job, value={j}),
),
(batch_duration[m], new_batch_duration),
(batch_start_time[m], earliest_possible),
(batch_size[m], job_size),
(batch_attribute[m], job_attr),
(current_time[m], earliest_possible), # Track current time
# Initialize max duration potential for new batch
(batch_max_duration_potential[m], job_max_dur),
],
preconditions=[
unscheduled.contains(j),
# Must fit in availability window
end_time <= avail_end,
earliest_possible >= avail_start,
],
)
transition_id = model.add_transition(close_and_start)
self.transitions[("close_start", j, m, slot_idx)] = close_and_start
self.transition_ids[("close_start", j, m, slot_idx)] = transition_id
# Transition dominance rules
if use_dominance:
# Rule 1: Adding to current batch dominates closing and starting new batch
# If we can fit job j in the current batch on machine m, prefer that over
# closing the batch and starting a new one (avoids setup costs)
dominance_count = 0
for j in jobs_sorted: # Use sorted jobs list
job_attr = job_attributes[j]
job_min_dur = job_min_durations[j]
job_max_dur = job_max_durations[j]
job_size = job_sizes[j]
eligible_machines = self.problem.tasks_data[j].eligible_machines
for m in eligible_machines:
machine_capacity = machine_capacities[m]
# Check if add_to_batch transition exists for this job/machine/attribute
add_key = ("add", j, m, job_attr)
if add_key not in self.transition_ids:
continue
# For all close_start transitions of the same job on the same machine
for slot_idx in range(
len(self.problem.machines_data[m].availability)
):
close_key = ("close_start", j, m, slot_idx)
if close_key not in self.transition_ids:
continue
# Conditions when adding to batch is better than closing:
# 1. Current batch is not empty
# 2. Batch has the same attribute as job j
# 3. Job fits in batch (duration compatibility)
# 4. Job fits in capacity
conditions = [
~current_batch[m].is_empty(),
batch_attribute[m] == job_attr,
job_min_dur <= batch_duration[m],
batch_duration[m] <= job_max_dur,
earliest_start_table[j] <= batch_start_time[m],
batch_size[m] + job_size <= machine_capacity,
]
model.add_transition_dominance(
self.transition_ids[add_key], # Dominant: add to batch
self.transition_ids[
close_key
], # Dominated: close and start
conditions,
)
dominance_count += 1
# Rule 2: Non-late job dominates late job for similar tasks
# If two jobs have the same attribute and can both fit in the current batch,
# prefer the one that won't be late
dominance_count_2 = 0
for j1 in jobs_sorted:
for j2 in jobs_sorted:
if j1 == j2:
continue
# Only consider jobs with same attribute
if job_attributes[j1] != job_attributes[j2]:
continue
attr = job_attributes[j1]
# Check if both have overlapping duration ranges
# (can potentially fit in the same batch)
min_dur_overlap = max(job_min_durations[j1], job_min_durations[j2])
max_dur_overlap = min(job_max_durations[j1], job_max_durations[j2])
if min_dur_overlap > max_dur_overlap:
continue # No overlap in duration ranges
# Find common eligible machines
common_machines = set(
self.problem.tasks_data[j1].eligible_machines
) & set(self.problem.tasks_data[j2].eligible_machines)
for m in common_machines:
# Check if both add transitions exist
add_key_j1 = ("add", j1, m, attr)
add_key_j2 = ("add", j2, m, attr)
if (
add_key_j1 not in self.transition_ids
or add_key_j2 not in self.transition_ids
):
continue
# Condition: j1 would not be late, but j2 would be late
# End time of batch: batch_start_time + batch_duration
end_time = batch_start_time[m] + batch_duration[m]
j1_not_late = end_time <= latest_end_table[j1]
j2_is_late = end_time > latest_end_table[j2]
# j1 dominates j2 if j1 is not late and j2 is late
model.add_transition_dominance(
self.transition_ids[add_key_j1], # Dominant: non-late job
self.transition_ids[add_key_j2], # Dominated: late job
[j1_not_late, j2_is_late],
)
dominance_count_2 += 1
# Rule 3: Tighter deadline dominates when both would be on time
# If we can schedule either j1 or j2, and both would be on time,
# prefer the one with tighter deadline to avoid future conflicts
dominance_count_3 = 0
for j1 in jobs_sorted:
for j2 in jobs_sorted:
if j1 == j2:
continue
if job_attributes[j1] != job_attributes[j2]:
continue
# j1 has tighter deadline (earlier latest_end)
if job_latest_ends[j1] >= job_latest_ends[j2]:
continue
# j2's deadline is loose enough that the difference is meaningful (at least 10 time units)
if job_latest_ends[j2] - job_latest_ends[j1] < 10:
continue
# Similar duration requirements
min_dur_overlap = max(job_min_durations[j1], job_min_durations[j2])
max_dur_overlap = min(job_max_durations[j1], job_max_durations[j2])
if min_dur_overlap > max_dur_overlap:
continue
attr = job_attributes[j1]
common_machines = set(
self.problem.tasks_data[j1].eligible_machines
) & set(self.problem.tasks_data[j2].eligible_machines)
for m in common_machines:
add_key_j1 = ("add", j1, m, attr)
add_key_j2 = ("add", j2, m, attr)
if (
add_key_j1 not in self.transition_ids
or add_key_j2 not in self.transition_ids
):
continue
# CRITICAL: Only dominate when BOTH would be on time
# This ensures we're not forcing a late job just because it has tight deadline
end_time = batch_start_time[m] + batch_duration[m]
both_on_time = (end_time <= latest_end_table[j1]) & (
end_time <= latest_end_table[j2]
)
model.add_transition_dominance(
self.transition_ids[
add_key_j1
], # Dominant: tighter deadline
self.transition_ids[
add_key_j2
], # Dominated: looser deadline
[both_on_time],
)
dominance_count_3 += 1
# Dual bounds (lower bound on cost-to-go)
if use_dual_bounds:
# A dual bound is a lower bound on the remaining cost
# We use the number of unscheduled jobs to estimate minimum remaining work
# Count remaining jobs
remaining_jobs_count = unscheduled.len()
# Lower bound on processing time: use global minimum across ALL jobs
# (can't use table.min(unscheduled) as it may panic in certain states)
remaining_size = batch_size[m]
global_min_duration = (remaining_jobs_count == 0).if_then_else(
0, min_duration_table.min(unscheduled)
)
# Lower bound: remaining_jobs * min_duration * weight
# This assumes we can schedule all jobs optimally (which we can't always do)
processing_lower_bound = (
remaining_jobs_count * global_min_duration * weight_processing
)
# For tardiness: very hard to estimate without knowing which jobs remain
# Use 0 as conservative estimate (assumes we might schedule everything on time)
tardiness_lower_bound = 0
# Setup costs: also hard to predict, use 0
setup_lower_bound = 0
dual_bound = (
processing_lower_bound + tardiness_lower_bound + setup_lower_bound
)
model.add_dual_bound(dual_bound)
# Base case: all jobs scheduled
# When we reach the base case, we need to pay processing cost for all remaining open batches
final_processing_cost = sum(
weight_processing * batch_duration[m] for m in range(n_machines)
)
model.add_base_case([unscheduled.is_empty()], cost=final_processing_cost)
self.model = model
[docs]
def retrieve_solution(self, sol: dp.Solution) -> Solution:
"""Retrieve solution from DIDPpy by replaying transitions."""
def extract_info(name: str) -> tuple:
"""Extract transition info from name."""
if name.startswith("add_"):
# add_j{j}_m{m}_attr{attr}
numbers = [int(x) for x in re.findall(r"\d+", name)]
if len(numbers) >= 3:
return ("add", numbers[0], numbers[1], numbers[2]) # j, m, attr
elif name.startswith("close_start_"):
# close_start_j{j}_m{m}_slot{slot}
numbers = [int(x) for x in re.findall(r"\d+", name)]
if len(numbers) >= 3:
return (
"close_start",
numbers[0],
numbers[1],
numbers[2],
) # j, m, slot
return None
# Simulate the transitions to reconstruct batches
# Track current state
current_batches = {
m: [] for m in range(self.problem.n_machines)
} # Current open batch jobs
closed_batches = {
m: [] for m in range(self.problem.n_machines)
} # List of closed batches
batch_start_times = [0] * self.problem.n_machines # Fixed when batch is created
batch_durations = [0] * self.problem.n_machines # Fixed when batch is created
batch_attributes = [
self.problem.machines_data[m].initial_attribute
for m in range(self.problem.n_machines)
]
for transition in sol.transitions:
info = extract_info(transition.name)
if not info:
continue
if info[0] == "add":
_, j, m, _ = info
# Add job to current open batch
current_batches[m].append(j)
# For dynamic duration model, update the batch duration
# It becomes the maximum of min_durations of all jobs in the batch
if (
hasattr(self, "_allow_batch_duration_update")
and self._allow_batch_duration_update
):
job_min_dur = self.problem.tasks_data[j].min_duration
batch_durations[m] = max(batch_durations[m], job_min_dur)
elif info[0] == "close_start":
_, j, m, slot_idx = info
# Close current batch if not empty
if current_batches[m]:
jobs_in_batch = current_batches[m]
attr = batch_attributes[m]
# For dynamic model, recompute final duration based on all jobs
if (
hasattr(self, "_allow_batch_duration_update")
and self._allow_batch_duration_update
):
# Final duration = max of min_durations of all jobs in batch
duration = max(
self.problem.tasks_data[j_id].min_duration
for j_id in jobs_in_batch
)
# Verify it doesn't exceed min of max_durations (should already be satisfied by model)
max_duration_limit = min(
self.problem.tasks_data[j_id].max_duration
for j_id in jobs_in_batch
)
if duration > max_duration_limit:
print(
f"Warning: batch duration {duration} exceeds max limit {max_duration_limit}"
)
duration = max_duration_limit
else:
duration = batch_durations[m]
start_time = batch_start_times[m]
end_time = start_time + duration
closed_batches[m].append(
{
"jobs": set(jobs_in_batch),
"attribute": attr,
"start": start_time,
"end": end_time,
}
)
# Start new batch with job j
job_data = self.problem.tasks_data[j]
# Calculate setup time
setup_time = self.problem.setup_times[batch_attributes[m]][
job_data.attribute
]
# Get availability window
avail_windows = self.problem.machines_data[m].availability
avail_start, avail_end = avail_windows[slot_idx]
# Calculate start time for new batch (this is fixed for this batch)
prev_end = (
batch_start_times[m] + batch_durations[m]
if current_batches[m]
else 0
)
start = max(prev_end + setup_time, job_data.earliest_start, avail_start)
# Fix new batch parameters
current_batches[m] = [j]
batch_attributes[m] = job_data.attribute
batch_start_times[m] = start
batch_durations[m] = (
job_data.min_duration
) # Fixed to first job's min_duration
# Close any remaining open batches
for m in range(self.problem.n_machines):
if current_batches[m]:
jobs_in_batch = current_batches[m]
attr = batch_attributes[m]
# For dynamic model, recompute final duration based on all jobs
if (
hasattr(self, "_allow_batch_duration_update")
and self._allow_batch_duration_update
):
duration = max(
self.problem.tasks_data[j_id].min_duration
for j_id in jobs_in_batch
)
max_duration_limit = min(
self.problem.tasks_data[j_id].max_duration
for j_id in jobs_in_batch
)
if duration > max_duration_limit:
print(
f"Warning: final batch duration {duration} exceeds max limit {max_duration_limit}"
)
duration = max_duration_limit
else:
duration = batch_durations[m]
start_time = batch_start_times[m]
end_time = start_time + duration
closed_batches[m].append(
{
"jobs": set(jobs_in_batch),
"attribute": attr,
"start": start_time,
"end": end_time,
}
)
# Convert to OvenSchedulingSolution format
schedule_per_machine = {}
for m in range(self.problem.n_machines):
batches = []
for batch_info in closed_batches[m]:
batches.append(
ScheduleInfo(
tasks=batch_info["jobs"],
task_attribute=batch_info["attribute"],
start_time=batch_info["start"],
end_time=batch_info["end"],
machine_batch_index=(m, len(batches)),
)
)
schedule_per_machine[m] = batches
sol = OvenSchedulingSolution(
problem=self.problem, schedule_per_machine=schedule_per_machine
)
print(
f"\nRelative performance: {self.aggreg_from_sol(sol) / self.problem.additional_data['ub']:.4f}"
)
return sol
[docs]
def set_warm_start(self, solution: OvenSchedulingSolution) -> None:
"""Convert an OvenSchedulingSolution into a sequence of transitions for warm start.
Args:
solution: An existing solution to use as warm start
The conversion depends on the batch duration model:
- Fixed duration model: Jobs in each batch are sorted by min_duration (descending)
to ensure the first job sets an appropriate batch duration
- Dynamic duration model: Jobs can be added in any feasible order
"""
if not hasattr(self, "model") or self.model is None:
raise ValueError("Model must be initialized before setting warm start")
transitions = []
# Process each machine's schedule
for machine_id in range(self.problem.n_machines):
if machine_id not in solution.schedule_per_machine:
continue
batches = solution.schedule_per_machine[machine_id]
for batch_idx, batch_info in enumerate(batches):
jobs_in_batch = list(batch_info.tasks)
if not jobs_in_batch:
continue
# Sort jobs for optimal transition sequence
# CRITICAL: The first job sets batch_start_time in the DP model.
# All subsequent jobs must satisfy: earliest_start <= batch_start_time
# Therefore, we must start with the job that has the LATEST earliest_start
# to ensure all jobs in the batch can satisfy this constraint.
# Secondary sort by min_duration (descending) for fixed duration model.
jobs_sorted = sorted(
jobs_in_batch,
key=lambda j: (
-self.problem.tasks_data[
j
].earliest_start, # Latest earliest_start first
-self.problem.tasks_data[
j
].min_duration, # Then largest min_duration
),
)
# First job: close previous batch and start new one
first_job = jobs_sorted[0]
job_attr = self.problem.tasks_data[first_job].attribute
# Find appropriate availability slot for this batch
slot_idx = self._find_slot_for_batch(machine_id, batch_info.start_time)
# Create close_and_start transition
transition_key = ("close_start", first_job, machine_id, slot_idx)
if transition_key in self.transitions:
transitions.append(self.transitions[transition_key])
# Remaining jobs: add to current batch
for job_id in jobs_sorted[1:]:
job_attr = self.problem.tasks_data[job_id].attribute
# Create add_to_batch transition
transition_key = ("add", job_id, machine_id, job_attr)
if transition_key in self.transitions:
transitions.append(self.transitions[transition_key])
# Set the initial solution in DIDPpy
if transitions:
self.initial_solution = transitions
print(f"Warm start: converted solution to {len(transitions)} transitions")
else:
print(
"Warning: could not generate any transitions from warm start solution"
)
def _find_slot_for_batch(self, machine_id: int, start_time: int) -> int:
"""Find the availability slot index that contains the given start time."""
availability = self.problem.machines_data[machine_id].availability
for slot_idx, (slot_start, slot_end) in enumerate(availability):
if slot_start <= start_time < slot_end:
return slot_idx
# Fallback: return first slot
return 0