# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""
Vector-based solution encoding for the Oven Scheduling Problem.
This module provides a permutation-based encoding suitable for metaheuristic
algorithms like simulated annealing, hill climbing, and genetic algorithms.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import numpy as np
import numpy.typing as npt
from discrete_optimization.generic_tools.do_problem import Solution
if TYPE_CHECKING:
from discrete_optimization.ovensched.problem import (
Machine,
OvenSchedulingProblem,
ScheduleInfo,
)
logger = logging.getLogger(__name__)
[docs]
def decode_permutation_to_schedule(
problem: "OvenSchedulingProblem",
permutation: npt.NDArray[np.int_],
max_open_batches_per_machine: int = 3,
) -> dict["Machine", list["ScheduleInfo"]]:
"""
Decode a task permutation into a schedule using an improved greedy strategy.
This decoder maintains multiple open batches per machine simultaneously,
allowing better batching of tasks with the same attribute even if they
appear at different positions in the permutation.
Strategy:
1. Process tasks in permutation order
2. For each task, try to add it to any compatible open batch on eligible machines
3. Compatible = same attribute, fits capacity, duration compatible
4. If no compatible batch, start new one (closing oldest if max batches reached)
5. Uses heap to manage batch priority (oldest batches get closed first)
Args:
problem: The OvenSchedulingProblem instance
permutation: Array of task indices in scheduling order
max_open_batches_per_machine: Maximum number of open batches per machine (default: 3)
Returns:
Dictionary mapping each machine to its list of scheduled batches
"""
import heapq
schedule_per_machine: dict[Machine, list[ScheduleInfo]] = {
m: [] for m in range(problem.n_machines)
}
# Track multiple open batches per machine
# Structure: {machine: [(priority, batch_id, batch_dict), ...]} as a min-heap
# Priority is based on creation order (older batches have lower priority)
open_batches: dict[Machine, list] = {m: [] for m in range(problem.n_machines)}
batch_id_counter = 0
for task_id in permutation:
task_data = problem.tasks_data[task_id]
eligible_machines = list(task_data.eligible_machines)
if not eligible_machines:
logger.warning(f"Task {task_id} has no eligible machines, skipping")
continue
# Try to find a compatible open batch on any eligible machine
best_machine = None
best_batch_idx = None
batch_found = False
for machine in eligible_machines:
if not open_batches[machine]:
continue
# Check all open batches on this machine
for idx, (priority, batch_id, batch) in enumerate(open_batches[machine]):
# Check attribute compatibility
if batch["attribute"] != task_data.attribute:
continue
# Check capacity
if (
batch["size"] + task_data.size
> problem.machines_data[machine].capacity
):
continue
# Check duration compatibility
new_min_dur = max(batch["min_dur"], task_data.min_duration)
new_max_dur = min(batch["max_dur"], task_data.max_duration)
if new_min_dur <= new_max_dur:
best_machine = machine
best_batch_idx = idx
batch_found = True
break
if batch_found:
break
if batch_found:
# Add task to existing batch
priority, batch_id, batch = open_batches[best_machine][best_batch_idx]
batch["tasks"].add(task_id)
batch["size"] += task_data.size
batch["min_dur"] = max(batch["min_dur"], task_data.min_duration)
batch["max_dur"] = min(batch["max_dur"], task_data.max_duration)
batch["last_access"] = task_id # Track last access for priority
else:
# No compatible batch found, create new one
# Choose best machine (first eligible for now, could be improved)
best_machine = eligible_machines[0]
# If max batches reached, close the oldest batch
if len(open_batches[best_machine]) >= max_open_batches_per_machine:
_close_oldest_batch(
problem, best_machine, open_batches, schedule_per_machine
)
# Create new batch
new_batch = {
"tasks": {task_id},
"attribute": task_data.attribute,
"size": task_data.size,
"min_dur": task_data.min_duration,
"max_dur": task_data.max_duration,
"last_access": task_id,
}
# Add to heap (priority = creation order, lower is older)
heapq.heappush(
open_batches[best_machine],
(batch_id_counter, batch_id_counter, new_batch),
)
batch_id_counter += 1
# Close all remaining open batches
for machine in range(problem.n_machines):
while open_batches[machine]:
_close_oldest_batch(problem, machine, open_batches, schedule_per_machine)
return schedule_per_machine
def _close_oldest_batch(
problem: "OvenSchedulingProblem",
machine: "Machine",
open_batches: dict["Machine", list],
schedule_per_machine: dict["Machine", list["ScheduleInfo"]],
) -> None:
"""
Close the oldest open batch on a machine (pop from heap).
This function:
- Removes the oldest batch from the heap
- Computes batch timing considering setup times and availability
- Adds the batch to the schedule
Args:
problem: The problem instance
machine: Machine index
open_batches: Heap of open batches per machine
schedule_per_machine: Schedule being built (modified in place)
"""
import heapq
from discrete_optimization.ovensched.problem import ScheduleInfo
if not open_batches[machine]:
return
# Pop oldest batch from heap
priority, batch_id, batch_info = heapq.heappop(open_batches[machine])
# Choose batch duration (minimum duration to maximize flexibility)
batch_duration = batch_info["min_dur"]
# Compute earliest start time considering previous batches
candidate_start = 0
if schedule_per_machine[machine]:
# Not first batch: start after previous batch + setup
last_batch = schedule_per_machine[machine][-1]
candidate_start = last_batch.end_time
prev_attr = last_batch.task_attribute
setup_time = problem.setup_times[prev_attr][batch_info["attribute"]]
candidate_start += setup_time
else:
# First batch: consider initial attribute setup
initial_attr = problem.machines_data[machine].initial_attribute
setup_time = problem.setup_times[initial_attr][batch_info["attribute"]]
candidate_start += setup_time
# CRITICAL: Respect earliest_start constraint of tasks in batch
tasks_earliest_start = max(
problem.tasks_data[t].earliest_start for t in batch_info["tasks"]
)
candidate_start = max(candidate_start, tasks_earliest_start)
# Find valid availability window
availability_windows = problem.machines_data[machine].availability
start_time = None
end_time = None
for window_start, window_end in availability_windows:
# Check if batch can fit in this window
if window_end - window_start < batch_duration:
continue
# Try to place batch at candidate_start if it's in this window
if (
candidate_start >= window_start
and candidate_start + batch_duration <= window_end
):
start_time = candidate_start
end_time = candidate_start + batch_duration
break
# Otherwise, try to place at start of window if window is after candidate
if window_start >= candidate_start:
if window_start + batch_duration <= window_end:
start_time = window_start
end_time = window_start + batch_duration
break
# If no valid window found, place anyway (solution will be infeasible but decoder completes)
if start_time is None:
start_time = candidate_start
end_time = candidate_start + batch_duration
# Create and add batch
batch_idx = len(schedule_per_machine[machine])
schedule_per_machine[machine].append(
ScheduleInfo(
tasks=batch_info["tasks"],
task_attribute=batch_info["attribute"],
start_time=start_time,
end_time=end_time,
machine_batch_index=(machine, batch_idx),
)
)
[docs]
def generate_random_permutation(
problem: "OvenSchedulingProblem",
) -> npt.NDArray[np.int_]:
"""
Generate a random task permutation.
Args:
problem: The problem instance
Returns:
Random permutation of task indices
"""
return np.random.permutation(problem.n_jobs).astype(np.int_)
[docs]
class VectorOvenSchedulingSolution(Solution):
"""
Vector-based solution for the Oven Scheduling Problem.
This solution is encoded as a permutation of tasks, which is decoded
into a schedule using a greedy decoding scheme. This encoding is suitable
for metaheuristic algorithms like simulated annealing and genetic algorithms.
The permutation represents the priority order in which tasks are scheduled.
A greedy decoder processes tasks in this order and assigns them to machines.
The decoder can maintain multiple open batches per machine, allowing better
consolidation of tasks with the same attribute.
Attributes:
problem: The OvenSchedulingProblem instance
permutation: Task permutation (numpy array of task indices)
_schedule_cache: Cached decoded schedule
_cache_max_open: Max open batches used for cached schedule
"""
problem: "OvenSchedulingProblem"
# Class-level default for decoder configuration
DEFAULT_MAX_OPEN_BATCHES = 5
def __init__(
self,
problem: "OvenSchedulingProblem",
permutation: npt.NDArray[np.int_] | None = None,
schedule_per_machine: dict["Machine", list["ScheduleInfo"]] | None = None,
):
"""
Initialize a vector-based solution.
Args:
problem: The problem instance
permutation: Task permutation. If None, will be extracted from schedule_per_machine
schedule_per_machine: Decoded schedule. If provided, used to extract permutation
"""
super().__init__(problem=problem)
if permutation is None and schedule_per_machine is None:
raise ValueError(
"Either permutation or schedule_per_machine must be provided"
)
# Store permutation
if permutation is not None:
self.permutation = np.array(permutation, dtype=np.int_)
else:
self.permutation = extract_permutation_from_schedule(
problem, schedule_per_machine
)
# Lazy evaluation cache
self._schedule_cache = None
self._cache_max_open = None
def __setattr__(self, key, value):
# Insure that we update the schedule after a job_to_batch change.
super().__setattr__(key, value)
if key == "permutation":
self._schedule_cache = None
[docs]
def get_schedule(self, max_open_batches: int | None = None):
"""
Get the decoded schedule (cached).
Args:
max_open_batches: Maximum open batches per machine during decoding.
If None, uses DEFAULT_MAX_OPEN_BATCHES.
Returns:
OvenSchedulingSolution instance
"""
from discrete_optimization.ovensched.problem import OvenSchedulingSolution
if max_open_batches is None:
max_open_batches = self.DEFAULT_MAX_OPEN_BATCHES
# Check if cache is valid for this configuration
if self._schedule_cache is None or self._cache_max_open != max_open_batches:
schedule_dict = decode_permutation_to_schedule(
self.problem,
self.permutation,
max_open_batches_per_machine=max_open_batches,
)
self._schedule_cache = OvenSchedulingSolution(
problem=self.problem, schedule_per_machine=schedule_dict
)
self._cache_max_open = max_open_batches
return self._schedule_cache
@property
def schedule_per_machine(self):
"""
Get the schedule_per_machine dictionary (delegates to decoded schedule).
This property makes VectorOvenSchedulingSolution compatible with
the problem's evaluate() and satisfy() methods.
"""
return self.get_schedule().schedule_per_machine
[docs]
def evaluate(self, max_open_batches: int | None = None) -> dict[str, float]:
"""
Evaluate the solution.
Args:
max_open_batches: Maximum open batches for decoder (uses DEFAULT_MAX_OPEN_BATCHES if None)
Returns:
Dictionary of objective values
"""
schedule = self.get_schedule(max_open_batches=max_open_batches)
return self.problem.evaluate(schedule)
[docs]
def is_feasible(self, max_open_batches: int | None = None) -> bool:
"""
Check if the solution is feasible.
Args:
max_open_batches: Maximum open batches for decoder (uses DEFAULT_MAX_OPEN_BATCHES if None)
Returns:
True if feasible, False otherwise
"""
schedule = self.get_schedule(max_open_batches=max_open_batches)
return self.problem.satisfy(schedule)
[docs]
def copy(self) -> "VectorOvenSchedulingSolution":
"""Create a deep copy of the solution."""
return VectorOvenSchedulingSolution(
problem=self.problem,
permutation=self.permutation.copy(),
)
[docs]
def change_problem(
self, new_problem: "OvenSchedulingProblem"
) -> "VectorOvenSchedulingSolution":
"""
Change the problem instance.
Args:
new_problem: New problem instance
Returns:
New solution with the new problem
"""
return VectorOvenSchedulingSolution(
problem=new_problem,
permutation=self.permutation.copy(),
)
[docs]
def to_oven_scheduling_solution(
self, max_open_batches: int | None = None
) -> "OvenSchedulingSolution":
"""
Convert to standard OvenSchedulingSolution.
This is useful for compatibility with solvers that expect
the standard solution format (e.g., CP-SAT warm-start).
Args:
max_open_batches: Max open batches for decoder (uses DEFAULT_MAX_OPEN_BATCHES if None)
Returns:
OvenSchedulingSolution with decoded schedule
"""
return self.get_schedule(max_open_batches=max_open_batches)
[docs]
def get_summary_string(self, max_open_batches: int | None = None) -> str:
"""
Generate a human-readable summary of the solution.
Args:
max_open_batches: Maximum open batches for decoder (uses DEFAULT_MAX_OPEN_BATCHES if None)
Returns:
Formatted string describing the solution
"""
evaluation = self.evaluate(max_open_batches=max_open_batches)
feasible = self.is_feasible(max_open_batches=max_open_batches)
lines = []
lines.append("=" * 80)
lines.append("VECTOR OVEN SCHEDULING SOLUTION")
lines.append("=" * 80)
lines.append(
f"\nDecoder config: max_open_batches = {max_open_batches or self.DEFAULT_MAX_OPEN_BATCHES}"
)
lines.append(f"Permutation: {self.permutation[:20]}... (showing first 20)")
lines.append(f"\nObjective Values:")
lines.append(f" Processing time: {evaluation['processing_time']}")
lines.append(f" Late jobs: {evaluation['nb_late_jobs']}")
lines.append(f" Setup cost: {evaluation['setup_cost']}")
lines.append(f" Total cost: {sum(evaluation.values())}")
lines.append(f"\nFeasible: {feasible}")
lines.append("\n" + "=" * 80)
return "\n".join(lines)
[docs]
def print_summary(self, max_open_batches: int | None = None):
"""Print a human-readable summary of the solution.
Args:
max_open_batches: Maximum open batches for decoder (uses DEFAULT_MAX_OPEN_BATCHES if None)
"""
print(self.get_summary_string(max_open_batches=max_open_batches))
def __repr__(self) -> str:
return f"VectorOvenSchedulingSolution(n_jobs={len(self.permutation)})"