# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import math
import random
from typing import Any, Optional
from discrete_optimization.generic_tools.callbacks.callback import (
Callback,
CallbackList,
)
from discrete_optimization.generic_tools.do_problem import ParamsObjectiveFunction
from discrete_optimization.generic_tools.do_solver import ResultStorage, SolverDO
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
FloatHyperparameter,
IntegerHyperparameter,
)
from discrete_optimization.ovensched.problem import (
OvenSchedulingProblem,
OvenSchedulingSolution,
ScheduleInfo,
)
logger = logging.getLogger(__name__)
[docs]
class HeuristicOvenSchedulingSolver(SolverDO):
"""
This solver can be evolved to use different scheduling strategies:
- Greedy batch construction
- Priority-based task ordering
- Attribute grouping strategies
- Time window optimization
- Setup cost minimization
"""
problem: OvenSchedulingProblem
hyperparameters = [
IntegerHyperparameter(
name="num_local_search_iterations", default=25, low=0, high=1000
),
FloatHyperparameter(name="cooling_rate", default=0.97, low=0, high=1),
]
def __init__(
self,
problem: OvenSchedulingProblem,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
**kwargs: Any,
):
super().__init__(
problem, params_objective_function=params_objective_function, **kwargs
)
[docs]
def solve(
self, callbacks: Optional[list[Callback]] = None, **kwargs: Any
) -> ResultStorage:
"""
Solve the OSP using a Large Neighborhood Search with a Simulated Annealing acceptance criterion.
Strategy:
1. Initial constructive phase: Build a complete initial schedule using a greedy heuristic.
2. Local Search / Refinement Phase: Iteratively attempt to improve the schedule.
a. Ruin: Select a "bad" part of the current schedule to destroy using one of several
diversified strategies (e.g., remove a tardy batch, a high-cost batch, or a random batch).
The selected batch and all subsequent batches on that machine are removed.
b. Recreate: The tasks from the ruined batches are put back into the unscheduled pool
and re-scheduled using the same powerful greedy constructor.
c. Acceptance (Simulated Annealing): A new schedule is always accepted if it's better
than the previous one. Crucially, it may also be accepted if it is *worse*, based
on a probability that depends on a decreasing "temperature". This allows the search
to escape local optima.
"""
kwargs = self.complete_with_default_hyperparameters(kwargs)
cb = CallbackList(callbacks)
cb.on_solve_start(self)
result_storage = self.create_result_storage([])
# Objective weights (can be overridden by params_objective_function)
default_weights = [1, 10000000, 1] # processing_time, nb_late_jobs, setup_cost
weights = (
self.params_objective_function.weights
if self.params_objective_function and self.params_objective_function.weights
else default_weights
)
# Base weights for the greedy heuristic's scoring function
tardiness_weight = weights[1]
initial_lookahead_penalty_weight = 10000
setup_cost_weight = weights[2]
adaptive_lookahead_factor = 5
# Pre-sort all tasks by criticality for efficient batch building
all_tasks_sorted_by_criticality = sorted(
range(self.problem.n_jobs),
key=lambda t: (
self.problem.tasks_data[t].latest_end,
self.problem.tasks_data[t].earliest_start,
),
)
def build_batch(
pool_of_tasks_to_draw_from: set[int],
initial_tasks_in_batch_set: set[int],
current_machine_data: Any,
):
current_batch_tasks = set(initial_tasks_in_batch_set)
current_size = sum(
self.problem.tasks_data[t].size for t in current_batch_tasks
)
current_max_min_d = max(
(self.problem.tasks_data[t].min_duration for t in current_batch_tasks),
default=0,
)
current_min_max_d = min(
(self.problem.tasks_data[t].max_duration for t in current_batch_tasks),
default=float("inf"),
)
tasks_to_consider_sorted_for_batching = [
t
for t in all_tasks_sorted_by_criticality
if t in pool_of_tasks_to_draw_from and t not in current_batch_tasks
]
for task_id in tasks_to_consider_sorted_for_batching:
task_data = self.problem.tasks_data[task_id]
if current_size + task_data.size <= current_machine_data.capacity:
new_max_min_d = max(current_max_min_d, task_data.min_duration)
new_min_max_d = min(current_min_max_d, task_data.max_duration)
if new_max_min_d <= new_min_max_d:
current_batch_tasks.add(task_id)
current_size += task_data.size
current_max_min_d = new_max_min_d
current_min_max_d = new_min_max_d
return current_batch_tasks
def _run_greedy_scheduling_phase(
initial_schedule_per_machine_state: dict[int, list[ScheduleInfo]],
tasks_to_schedule_in_this_pass: set[int],
):
schedule_per_machine = {
m: list(s) for m, s in initial_schedule_per_machine_state.items()
}
unscheduled_tasks = set(tasks_to_schedule_in_this_pass)
machine_end_times = [0] * self.problem.n_machines
last_attribute_on_machine = [
m.initial_attribute for m in self.problem.machines_data
]
for m_id in range(self.problem.n_machines):
if schedule_per_machine[m_id]:
last_batch = schedule_per_machine[m_id][-1]
machine_end_times[m_id] = last_batch.end_time
last_attribute_on_machine[m_id] = last_batch.task_attribute
while unscheduled_tasks:
num_total_tasks = self.problem.n_jobs
num_unscheduled = len(unscheduled_tasks)
progress_ratio = (
(num_total_tasks - num_unscheduled) / num_total_tasks
if num_total_tasks > 0
else 0
)
current_lookahead_penalty_weight = initial_lookahead_penalty_weight * (
1 + progress_ratio * adaptive_lookahead_factor
)
unscheduled_tasks_latest_ends = {
tid: self.problem.tasks_data[tid].latest_end
for tid in unscheduled_tasks
}
best_candidate = None
best_score = float("inf")
for machine_id in range(self.problem.n_machines):
machine_data = self.problem.machines_data[machine_id]
prev_attribute = last_attribute_on_machine[machine_id]
possible_attributes = {
self.problem.tasks_data[t].attribute
for t in unscheduled_tasks
if machine_id in self.problem.tasks_data[t].eligible_machines
}
for attribute in possible_attributes:
setup_time = self.problem.setup_times[prev_attribute][attribute]
setup_cost = self.problem.setup_costs[prev_attribute][attribute]
machine_ready_time_for_this_batch = (
machine_end_times[machine_id] + setup_time
)
eligible_tasks_for_attribute_machine = sorted(
[
t
for t in unscheduled_tasks
if self.problem.tasks_data[t].attribute == attribute
and machine_id
in self.problem.tasks_data[t].eligible_machines
],
key=lambda t: (
self.problem.tasks_data[t].latest_end,
self.problem.tasks_data[t].earliest_start,
),
)
if not eligible_tasks_for_attribute_machine:
continue
num_seed_tasks_to_consider = 3
unique_candidates = set()
for seed_task_index in range(
min(
num_seed_tasks_to_consider,
len(eligible_tasks_for_attribute_machine),
)
):
seed_task_id = eligible_tasks_for_attribute_machine[
seed_task_index
]
seed_task_data = self.problem.tasks_data[seed_task_id]
candidate_sets_for_this_seed = []
mf_tasks = build_batch(
set(eligible_tasks_for_attribute_machine),
{seed_task_id},
machine_data,
)
if mf_tasks:
candidate_sets_for_this_seed.append(mf_tasks)
candidate_sets_for_this_seed.append({seed_task_id})
q_tasks_filtered_pool = {seed_task_id}
urgency_delta = max(1, seed_task_data.max_duration) * 2
temporal_upper_bound = (
seed_task_data.latest_end + urgency_delta
)
for task_id in eligible_tasks_for_attribute_machine:
if (
task_id != seed_task_id
and self.problem.tasks_data[task_id].latest_end
<= temporal_upper_bound
):
q_tasks_filtered_pool.add(task_id)
q_tasks = build_batch(
q_tasks_filtered_pool, {seed_task_id}, machine_data
)
if q_tasks:
candidate_sets_for_this_seed.append(q_tasks)
unique_candidates.update(
{
frozenset(s)
for s in candidate_sets_for_this_seed
if s
}
)
for batch_tasks in unique_candidates:
batch_props = {
"max_min_d": max(
(
self.problem.tasks_data[t].min_duration
for t in batch_tasks
),
default=0,
),
"min_max_d": min(
(
self.problem.tasks_data[t].max_duration
for t in batch_tasks
),
default=float("inf"),
),
"max_es": max(
(
self.problem.tasks_data[t].earliest_start
for t in batch_tasks
),
default=0,
),
"min_le": min(
(
self.problem.tasks_data[t].latest_end
for t in batch_tasks
),
default=float("inf"),
),
}
b_min_duration = max(1, batch_props["max_min_d"])
b_max_duration = batch_props["min_max_d"]
if b_min_duration > b_max_duration:
continue
earliest_start_base = max(
machine_ready_time_for_this_batch, batch_props["max_es"]
)
found_slot_for_any_duration = False
for start_avail, end_avail in machine_data.availability:
potential_start = max(earliest_start_base, start_avail)
current_durations_to_check = {b_min_duration}
if (
b_max_duration != float("inf")
and b_max_duration > b_min_duration
):
current_durations_to_check.add(b_max_duration)
duration_to_hit_deadline = (
batch_props["min_le"] - potential_start
)
if (
b_min_duration
<= duration_to_hit_deadline
<= b_max_duration
):
current_durations_to_check.add(
duration_to_hit_deadline
)
if (
b_max_duration != float("inf")
and (b_max_duration - b_min_duration) > 4
):
step = (b_max_duration - b_min_duration) // 4
for i in range(1, 4):
intermediate_duration = (
b_min_duration + i * step
)
if (
b_min_duration
< intermediate_duration
< b_max_duration
):
current_durations_to_check.add(
intermediate_duration
)
duration_filling_slot = end_avail - potential_start
if (
duration_filling_slot > 0
and b_min_duration
<= duration_filling_slot
<= b_max_duration
):
current_durations_to_check.add(
duration_filling_slot
)
for duration in sorted(
list(current_durations_to_check)
):
if (
duration <= 0
or duration < b_min_duration
or (
b_max_duration != float("inf")
and duration > b_max_duration
)
):
continue
end_time = potential_start + duration
if end_time <= end_avail:
found_slot_for_any_duration = True
tardiness = max(
0, end_time - batch_props["min_le"]
)
lookahead_penalty_value = sum(
max(0, end_time - le)
for tid, le in unscheduled_tasks_latest_ends.items()
if tid not in batch_tasks
)
score = (
weights[0] * end_time
+ tardiness_weight * tardiness
+ current_lookahead_penalty_weight
* lookahead_penalty_value
+ setup_cost_weight * setup_cost
)
if score < best_score:
best_score = score
best_candidate = {
"tasks": batch_tasks,
"attribute": attribute,
"start_time": potential_start,
"end_time": end_time,
"machine_id": machine_id,
}
if found_slot_for_any_duration:
break
if best_candidate:
machine_id = best_candidate["machine_id"]
new_batch = ScheduleInfo(
tasks=set(best_candidate["tasks"]),
task_attribute=best_candidate["attribute"],
start_time=best_candidate["start_time"],
end_time=best_candidate["end_time"],
machine_batch_index=(
machine_id,
len(schedule_per_machine[machine_id]),
),
)
schedule_per_machine[machine_id].append(new_batch)
schedule_per_machine[machine_id].sort(key=lambda b: b.start_time)
unscheduled_tasks -= best_candidate["tasks"]
machine_end_times[machine_id] = new_batch.end_time
last_attribute_on_machine[machine_id] = new_batch.task_attribute
else:
if unscheduled_tasks:
most_critical_task_id = sorted(
list(unscheduled_tasks),
key=lambda t: (
self.problem.tasks_data[t].latest_end,
self.problem.tasks_data[t].earliest_start,
),
)[0]
logger.warning(
f"Could not schedule task {most_critical_task_id}. Removing it to proceed."
)
unscheduled_tasks.remove(most_critical_task_id)
else:
break
return schedule_per_machine, unscheduled_tasks
def _identify_worst_batch_and_affected_tasks(
solution: OvenSchedulingSolution, strategy: str
):
worst_batch_location = None
if strategy == "tardiness":
worst_tardiness_score, worst_end_time = -1, -1
for m_id, schedule in solution.schedule_per_machine.items():
for b_idx, b_info in enumerate(schedule):
batch_tardiness = sum(
max(
0,
b_info.end_time
- self.problem.tasks_data[t_id].latest_end,
)
for t_id in b_info.tasks
)
if batch_tardiness > worst_tardiness_score or (
batch_tardiness == worst_tardiness_score
and b_info.end_time > worst_end_time
):
(
worst_tardiness_score,
worst_end_time,
worst_batch_location,
) = batch_tardiness, b_info.end_time, (m_id, b_idx)
elif strategy == "latest_finish":
latest_end_time = -1
for m_id, schedule in solution.schedule_per_machine.items():
if schedule and schedule[-1].end_time > latest_end_time:
latest_end_time, worst_batch_location = (
schedule[-1].end_time,
(m_id, len(schedule) - 1),
)
elif strategy == "setup_cost":
max_setup_cost = -1
for m_id, schedule in solution.schedule_per_machine.items():
last_attr = self.problem.machines_data[m_id].initial_attribute
for b_idx, b_info in enumerate(schedule):
current_setup_cost = self.problem.setup_costs[last_attr][
b_info.task_attribute
]
if current_setup_cost > max_setup_cost:
max_setup_cost, worst_batch_location = (
current_setup_cost,
(m_id, b_idx),
)
last_attr = b_info.task_attribute
elif strategy == "random":
all_batches = [
(m_id, b_idx)
for m_id, schedule in solution.schedule_per_machine.items()
for b_idx in range(len(schedule))
]
if all_batches:
worst_batch_location = random.choice(all_batches)
if worst_batch_location is None:
return None
machine_id, batch_idx = worst_batch_location
tasks_to_reschedule_from_removal = set().union(
*(
b_info.tasks
for b_info in solution.schedule_per_machine[machine_id][batch_idx:]
)
)
return machine_id, batch_idx, tasks_to_reschedule_from_removal
# --- Main solve logic with Simulated Annealing ---
initial_schedule_per_machine = {m: [] for m in range(self.problem.n_machines)}
initial_schedule, initial_unscheduled = _run_greedy_scheduling_phase(
initial_schedule_per_machine, set(range(self.problem.n_jobs))
)
best_solution = OvenSchedulingSolution(
problem=self.problem, schedule_per_machine=initial_schedule
)
best_fit = self.aggreg_from_sol(best_solution)
result_storage.append((best_solution, best_fit))
cb.on_step_end(1, result_storage, self)
current_solution = best_solution
current_fit = best_fit
current_unscheduled = initial_unscheduled
num_local_search_iterations = kwargs["num_local_search_iterations"]
ruin_strategies = ["tardiness", "latest_finish", "setup_cost", "random"]
initial_temperature = 0.01 * abs(best_fit) if best_fit != 0 else 1000.0
cooling_rate = kwargs["cooling_rate"]
temperature = initial_temperature
for iter_ls in range(num_local_search_iterations):
current_ruin_strategy = random.choice(ruin_strategies)
worst_batch_info = _identify_worst_batch_and_affected_tasks(
current_solution, strategy=current_ruin_strategy
)
if worst_batch_info is None:
break
machine_id_to_clear, batch_index_to_clear, tasks_to_reschedule = (
worst_batch_info
)
temp_schedule = {
m: list(s) for m, s in current_solution.schedule_per_machine.items()
}
temp_schedule[machine_id_to_clear] = temp_schedule[machine_id_to_clear][
:batch_index_to_clear
]
tasks_for_next_pass = current_unscheduled | tasks_to_reschedule
new_schedule, new_unscheduled = _run_greedy_scheduling_phase(
temp_schedule, tasks_for_next_pass
)
new_solution = OvenSchedulingSolution(
problem=self.problem, schedule_per_machine=new_schedule
)
new_fit = self.aggreg_from_sol(new_solution)
if new_fit < best_fit:
best_solution = new_solution
best_fit = new_fit
result_storage.append((best_solution, best_fit))
# SA Acceptance Criterion
if new_fit < current_fit or (
temperature > 1e-6
and random.random() < math.exp(-(new_fit - current_fit) / temperature)
):
current_solution = new_solution
current_fit = new_fit
current_unscheduled = new_unscheduled
temperature *= cooling_rate
stop = cb.on_step_end(iter_ls + 1, result_storage, self)
if stop:
break
cb.on_solve_end(result_storage, self)
return result_storage