Source code for discrete_optimization.ovensched.solvers.optal

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
"""OptalCP solver for the Oven Scheduling Problem."""

from __future__ import annotations

import logging
from collections import defaultdict
from enum import Enum
from typing import Any

from discrete_optimization.generic_tools.do_problem import ParamsObjectiveFunction
from discrete_optimization.generic_tools.do_solver import WarmstartMixin
from discrete_optimization.generic_tools.hub_solver.optal.optalcp_tools import (
    OptalCpSolver,
)
from discrete_optimization.ovensched.problem import (
    OvenSchedulingProblem,
    OvenSchedulingSolution,
    ScheduleInfo,
)

try:
    import optalcp as cp

    optalcp_available = True
except ImportError:
    cp = None
    optalcp_available = False

logger = logging.getLogger(__name__)


[docs] class OvenObjectivesEnum(Enum): """Enumeration of available objectives for the oven scheduling problem.""" NB_BATCH = "nb_batch" PROCESSING_TIME = "processing_time" SETUP_COST = "setup_cost" TARDINESS = "tardiness" MAKESPAN = "makespan"
[docs] class OvenSchedulingOptalSolver(OptalCpSolver, WarmstartMixin): """OptalCP solver for the Oven Scheduling Problem. This solver uses the OptalCP constraint programming solver to model the batching problem with machines, setup times/costs, and capacity constraints. """ problem: OvenSchedulingProblem def __init__( self, problem: OvenSchedulingProblem, params_objective_function: ParamsObjectiveFunction | None = None, max_nb_batch_per_machine: int | None = None, setup_modeling: str = "baseline", use_batch_count_vars: bool = False, use_explicit_batch_attrs: bool = False, use_incompatibility_constraints: bool = False, **kwargs: Any, ): """Initialize the OptalCP solver. Args: problem: The oven scheduling problem instance params_objective_function: Parameters for objective function max_nb_batch_per_machine: Maximum number of batches per machine. If None, estimated as 3 * n_jobs / n_machines setup_modeling: Setup cost modeling approach: - "baseline": Nested implies constraints (current approach) - "explicit_attrs": Explicit attribute variables per batch with element constraint - "direct_sequence": Compute from sequence variable (experimental) use_batch_count_vars: If True, add explicit integer variable for number of batches per machine use_explicit_batch_attrs: If True, add explicit attribute variables for better branching use_incompatibility_constraints: If True, add constraints forbidding incompatible jobs from being in the same batch. Jobs are incompatible if they have different attributes or non-overlapping duration ranges. Default False as these constraints may hurt performance by over-restricting the search space. **kwargs: Additional arguments passed to parent """ if not optalcp_available: raise RuntimeError( "OptalCP is not available. Install it from: https://www.optalcp.com/" ) super().__init__(problem, params_objective_function, **kwargs) self.max_nb_batch = max_nb_batch_per_machine if self.max_nb_batch is None: self.max_nb_batch = max( 1, int(4 * self.problem.n_jobs / self.problem.n_machines) ) # Modeling options self.setup_modeling = setup_modeling self.use_batch_count_vars = use_batch_count_vars self.use_explicit_batch_attrs = use_explicit_batch_attrs self.use_incompatibility_constraints = use_incompatibility_constraints # Index tasks by eligible machines self.tasks_per_machine = { m: [ t for t in range(self.problem.n_jobs) if m in self.problem.tasks_data[t].eligible_machines ] for m in range(self.problem.n_machines) } self.variables = {} self.current_solution = None
[docs] def init_model(self, **kwargs: Any) -> None: """Initialize the OptalCP model.""" self.cp_model = cp.Model() self.variables = {} # Create decision variables and constraints self._define_machine_calendar() self._define_machines_intervals() self._constraint_convention_machines() self._define_jobs_intervals() # Optionally add incompatibility constraints if self.use_incompatibility_constraints: self._add_job_incompatibility_constraints() # Setup cost modeling (choose based on option) if self.setup_modeling == "baseline": self._constraint_setup_time_baseline() elif self.setup_modeling == "explicit_attrs": self._constraint_setup_time_explicit_attrs() elif self.setup_modeling == "direct_sequence": self._constraint_setup_time_direct_sequence() else: raise ValueError(f"Unknown setup_modeling: {self.setup_modeling}") # Optional: explicit batch count variables if self.use_batch_count_vars: self._add_batch_count_variables() # Set objectives based on params_objective_function self._define_objectives_from_params()
def _add_job_incompatibility_constraints(self): """Add constraints to forbid incompatible jobs from being in the same batch. Two jobs are incompatible if: 1. They have different attributes 2. Their duration ranges don't overlap (no common valid duration) Note: While these constraints are logically valid, they may hurt performance by over-restricting the search space and removing useful stepping stones for the LNS search. Use use_incompatibility_constraints=True to enable. """ print("Adding job incompatibility constraints...") # Precompute incompatible pairs incompatible_pairs = [] for i in range(self.problem.n_jobs): for j in range(i + 1, self.problem.n_jobs): task_i = self.problem.tasks_data[i] task_j = self.problem.tasks_data[j] # Different attributes => incompatible (can't be in same batch) if task_i.attribute != task_j.attribute: incompatible_pairs.append((i, j)) continue # Check duration compatibility # Jobs are incompatible if their duration ranges don't overlap min_duration = max(task_i.min_duration, task_j.min_duration) max_duration = min(task_i.max_duration, task_j.max_duration) if min_duration > max_duration: # No valid duration satisfies both jobs incompatible_pairs.append((i, j)) print(f"Found {len(incompatible_pairs)} incompatible job pairs") # Add constraints for incompatible pairs # If job i and job j are incompatible, they cannot be in the same batch on any machine for i, j in incompatible_pairs: task_i = self.problem.tasks_data[i] task_j = self.problem.tasks_data[j] # Get common eligible machines common_machines = set(task_i.eligible_machines) & set( task_j.eligible_machines ) for m in common_machines: if ( i in self.variables["index_batch_machine_for_job"] and m in self.variables["index_batch_machine_for_job"][i] and j in self.variables["index_batch_machine_for_job"] and m in self.variables["index_batch_machine_for_job"][j] ): # Get batch index variables for jobs i and j on machine m batch_index_i = self.variables["index_batch_machine_for_job"][i][m] batch_index_j = self.variables["index_batch_machine_for_job"][j][m] # If both jobs are on machine m, they must be in different batches # This is expressed as: batch_index_i != batch_index_j # when both jobs are present on machine m interval_i_m = self.variables["interval_job_per_machine"][i][m] interval_j_m = self.variables["interval_job_per_machine"][j][m] # Constraint: if both are present on machine m, they must have different batch indices self.cp_model.enforce( self.cp_model.implies( self.cp_model.presence(interval_i_m) & self.cp_model.presence(interval_j_m), batch_index_i != batch_index_j, ) ) def _define_machine_calendar(self): """Define calendar constraints for machine availability.""" self.calendar_step_function = {} for m in range(self.problem.n_machines): availability = self.problem.machines_data[m].availability list_values = [] for start, end in availability: if len(list_values) > 0: if start == list_values[-1][0]: list_values[-1] = (start, 1) list_values.append((end + 1, 0)) continue list_values.append((start, 1)) list_values.append((end, 0)) self.calendar_step_function[m] = self.cp_model.step_function(list_values) def _define_machines_intervals(self): """Create interval variables for batches on each machine.""" horizon = max( interval[1] for machine_data in self.problem.machines_data for interval in machine_data.availability ) intervals_per_machines = {m: [] for m in range(self.problem.n_machines)} intervals_per_machines_per_attr = { m: {} for m in range(self.problem.n_machines) } for m in range(self.problem.n_machines): tasks = self.tasks_per_machine[m] attributes = set(self.problem.tasks_data[t].attribute for t in tasks) # Get possible durations for this machine possible_durations = [] for t in tasks: possible_durations.extend( [ self.problem.tasks_data[t].min_duration, self.problem.tasks_data[t].max_duration, ] ) possible_durations = sorted(set(possible_durations)) # Create batch intervals for this machine for b in range(self.max_nb_batch): interval = self.cp_model.interval_var( start=(0, horizon), length=(min(possible_durations), max(possible_durations)), end=(0, horizon), optional=True, name=f"interval_machine_{m}_{b}", ) intervals_per_machines[m].append(interval) # Forbid extent outside calendar self.cp_model.forbid_extent(interval, self.calendar_step_function[m]) # Create attribute-specific intervals for attr in attributes: attr_durations = [] for t in tasks: if self.problem.tasks_data[t].attribute == attr: attr_durations.extend( [ self.problem.tasks_data[t].min_duration, self.problem.tasks_data[t].max_duration, ] ) attr_durations = sorted(set(attr_durations)) intervals_per_machines_per_attr[m][attr] = [] for b in range(self.max_nb_batch): interval = self.cp_model.interval_var( start=(0, horizon), length=(min(attr_durations), max(attr_durations)), end=(0, horizon), optional=True, name=f"interval_machine_{m}_{b}_attr{attr}", ) intervals_per_machines_per_attr[m][attr].append(interval) # Link main intervals with attribute intervals (alternative constraint) for b in range(self.max_nb_batch): self.cp_model.alternative( intervals_per_machines[m][b], [ intervals_per_machines_per_attr[m][attr][b] for attr in attributes ], ) self.variables["intervals_per_machines"] = intervals_per_machines self.variables["intervals_per_machines_per_attr"] = ( intervals_per_machines_per_attr ) def _constraint_convention_machines(self): """Add ordering constraints on batches within each machine.""" intervals_per_machines = self.variables["intervals_per_machines"] intervals_per_machines_per_attr = self.variables[ "intervals_per_machines_per_attr" ] for m in intervals_per_machines: # Batches must be used in order (no gaps) for i in range(self.max_nb_batch - 1): self.cp_model.enforce( self.cp_model.presence(intervals_per_machines[m][i]) >= self.cp_model.presence(intervals_per_machines[m][i + 1]) ) # Batch i+1 must start after batch i ends self.cp_model.end_before_start( intervals_per_machines[m][i], intervals_per_machines[m][i + 1] ) # Hidden constraint for presence chain self.cp_model._itv_presence_chain(intervals_per_machines[m]) # Each batch has exactly one attribute (aids propagation even if redundant) for i in range(self.max_nb_batch): attributes = list(intervals_per_machines_per_attr[m].keys()) self.cp_model.alternative( intervals_per_machines[m][i], [ intervals_per_machines_per_attr[m][attr][i] for attr in attributes ], ) def _define_jobs_intervals(self): """Create interval variables for jobs and link them to batches.""" horizon = max( interval[1] for machine_data in self.problem.machines_data for interval in machine_data.availability ) interval_job = {} interval_job_per_machine = {} index_batch_machine_for_job = {t: {} for t in range(self.problem.n_jobs)} interval_batch_machine_for_job = {t: {} for t in range(self.problem.n_jobs)} # Create job intervals for t in self.problem.tasks_list: task_data = self.problem.tasks_data[t] # Main job interval interval_job[t] = self.cp_model.interval_var( start=(task_data.earliest_start, horizon), end=( task_data.earliest_start + task_data.min_duration, horizon, ), length=(task_data.min_duration, task_data.max_duration), optional=False, name=f"interval_job_{t}", ) # Job intervals per machine interval_job_per_machine[t] = {} for m in task_data.eligible_machines: interval_job_per_machine[t][m] = self.cp_model.interval_var( start=(task_data.earliest_start, horizon), end=( task_data.earliest_start + task_data.min_duration, horizon, ), length=(task_data.min_duration, task_data.max_duration), optional=True, name=f"interval_job_{t}_{m}", ) # Alternative: job must be on exactly one machine self.cp_model.alternative( interval_job[t], [interval_job_per_machine[t][m] for m in interval_job_per_machine[t]], ) for m in range(self.problem.n_machines): tasks = self.tasks_per_machine[m] intervals_on_machine = [ ( self.cp_model.interval_var( start=(0, self.max_nb_batch - 1), end=(1, self.max_nb_batch), length=1, optional=True, ), self.problem.tasks_data[t].size, ) for t in tasks ] index = [self.cp_model.start(x[0]) for x in intervals_on_machine] self.cp_model._itv_mapping( [interval_job_per_machine[t][m] for t in tasks], [ self.variables["intervals_per_machines"][m][b] for b in range(self.max_nb_batch) ], index, ) # Capacity constraints using pulse (no _pack needed) self.cp_model.enforce( self.cp_model.sum( [self.cp_model.pulse(x[0], x[1]) for x in intervals_on_machine] ) <= self.problem.machines_data[m].capacity ) self.cp_model.enforce( self.cp_model.sum( [ self.cp_model.pulse( interval_job_per_machine[t][m], self.problem.tasks_data[t].size, ) for t in tasks ] ) <= self.problem.machines_data[m].capacity ) for i in range(len(tasks)): interval_batch_machine_for_job[tasks[i]][m] = intervals_on_machine[i][0] index_batch_machine_for_job[tasks[i]][m] = index[ i ] # Store batch index variable attr = self.problem.tasks_data[tasks[i]].attribute self.cp_model.enforce( self.cp_model.presence(intervals_on_machine[i][0]) == self.cp_model.presence(interval_job_per_machine[tasks[i]][m]) ) for val in range(self.max_nb_batch): # Attribute presence constraint (simpler than implies) self.cp_model.enforce( self.cp_model.presence( self.variables["intervals_per_machines_per_attr"][m][attr][ val ] ) >= (val == index[i]) ) # Duration constraints self.cp_model.enforce( self.cp_model.implies( val == index[i], self.cp_model.length( self.variables["intervals_per_machines"][m][val] ) >= self.problem.tasks_data[tasks[i]].min_duration, ) ) self.cp_model.enforce( self.cp_model.implies( val == index[i], self.cp_model.length( self.variables["intervals_per_machines"][m][val] ) <= self.problem.tasks_data[tasks[i]].max_duration, ) ) self.variables["interval_job"] = interval_job self.variables["interval_job_per_machine"] = interval_job_per_machine self.variables["index_batch_machine_for_job"] = index_batch_machine_for_job self.variables["interval_batch_machine_for_job"] = ( interval_batch_machine_for_job ) def _constraint_setup_time_baseline(self): """Add setup time and cost constraints (baseline approach with nested implies).""" possible_setup_times = sorted( set([0] + [x for row in self.problem.setup_times for x in row]) ) possible_setup_costs = sorted( set([x for row in self.problem.setup_costs for x in row]) ) setup_cost = { m: [ self.cp_model.int_var( min=min(possible_setup_costs), max=max(possible_setup_costs), optional=True, name=f"setup_cost_m{m}_b{b}", ) for b in range(self.max_nb_batch) ] for m in range(self.problem.n_machines) } # Store dummy initial intervals for warm start self.variables["dummy_initial_intervals"] = {} for m in range(self.problem.n_machines): machine_data = self.problem.machines_data[m] intervals_per_attr = self.variables["intervals_per_machines_per_attr"][m] # Create sequence with initial state - store dummy interval dummy_initial = self.cp_model.interval_var(-1, 0, 1) self.variables["dummy_initial_intervals"][m] = dummy_initial all_intervals = [(dummy_initial, machine_data.initial_attribute)] all_intervals += [ (intervals_per_attr[attr][i], attr) for attr in intervals_per_attr for i in range(self.max_nb_batch) ] seq = self.cp_model.sequence_var( [x[0] for x in all_intervals], types=[x[1] for x in all_intervals] ) # No overlap with setup times seq.no_overlap(self.problem.setup_times) # Setup cost constraints # First batch setup from initial state for attr in intervals_per_attr: iv = intervals_per_attr[attr][0] self.cp_model.enforce( self.cp_model.implies( self.cp_model.presence(iv), setup_cost[m][0] == self.problem.setup_costs[machine_data.initial_attribute][ attr ], ) ) # Subsequent batches for i in range(1, self.max_nb_batch): for prev_attr in intervals_per_attr: iv_prev = intervals_per_attr[prev_attr][i - 1] for curr_attr in intervals_per_attr: iv_curr = intervals_per_attr[curr_attr][i] self.cp_model.enforce( self.cp_model.implies( self.cp_model.presence(iv_prev) & self.cp_model.presence(iv_curr), setup_cost[m][i] == self.problem.setup_costs[prev_attr][curr_attr], ) ) # Link setup cost presence to batch presence for i in range(self.max_nb_batch): self.cp_model.enforce( self.cp_model.presence( self.variables["intervals_per_machines"][m][i] ) == self.cp_model.presence(setup_cost[m][i]) ) self.variables["setup_cost_per_machine"] = setup_cost def _constraint_setup_time_explicit_attrs(self): """Add setup constraints with explicit attribute variables per batch. This approach creates integer variables for the attribute assigned to each batch, allowing the solver to branch on high-level decisions before fixing detailed timings. """ possible_setup_costs = sorted( set([x for row in self.problem.setup_costs for x in row]) ) setup_cost = { m: [ self.cp_model.int_var( min=min(possible_setup_costs), max=max(possible_setup_costs), optional=True, name=f"setup_cost_m{m}_b{b}", ) for b in range(self.max_nb_batch) ] for m in range(self.problem.n_machines) } # Explicit attribute variables per batch (for branching) batch_attribute = {} # Store dummy initial intervals for warm start self.variables["dummy_initial_intervals"] = {} for m in range(self.problem.n_machines): machine_data = self.problem.machines_data[m] intervals_per_attr = self.variables["intervals_per_machines_per_attr"][m] all_attributes = sorted(intervals_per_attr.keys()) batch_attribute[m] = [] for b in range(self.max_nb_batch): attr_var = self.cp_model.int_var( min=min(all_attributes), max=max(all_attributes), optional=True, name=f"batch_attr_m{m}_b{b}", ) batch_attribute[m].append(attr_var) # Link attribute variable to attribute interval presence for attr in all_attributes: self.cp_model.enforce( self.cp_model.presence(intervals_per_attr[attr][b]) == (attr_var == attr) ) # Create sequence with initial state - store dummy interval dummy_initial = self.cp_model.interval_var(-1, 0, 1) self.variables["dummy_initial_intervals"][m] = dummy_initial all_intervals = [(dummy_initial, machine_data.initial_attribute)] all_intervals += [ (intervals_per_attr[attr][i], attr) for attr in intervals_per_attr for i in range(self.max_nb_batch) ] seq = self.cp_model.sequence_var( [x[0] for x in all_intervals], types=[x[1] for x in all_intervals] ) # No overlap with setup times seq.no_overlap(self.problem.setup_times) # Setup costs using element constraint # First batch: use element to get cost from initial_attribute to batch_attribute[0] if self.max_nb_batch > 0: costs_from_initial = [ self.problem.setup_costs[machine_data.initial_attribute][attr] for attr in all_attributes ] # Use table constraint for first batch for attr_idx, attr in enumerate(all_attributes): self.cp_model.enforce( self.cp_model.implies( batch_attribute[m][0] == attr, setup_cost[m][0] == costs_from_initial[attr_idx], ) ) # Subsequent batches: use element on prev_attr and curr_attr for i in range(1, self.max_nb_batch): # For each possible transition, add constraint for prev_attr in all_attributes: for curr_attr in all_attributes: self.cp_model.enforce( self.cp_model.implies( (batch_attribute[m][i - 1] == prev_attr) & (batch_attribute[m][i] == curr_attr), setup_cost[m][i] == self.problem.setup_costs[prev_attr][curr_attr], ) ) # Link setup cost presence to batch presence for i in range(self.max_nb_batch): self.cp_model.enforce( self.cp_model.presence( self.variables["intervals_per_machines"][m][i] ) == self.cp_model.presence(setup_cost[m][i]) ) self.cp_model.enforce( self.cp_model.presence( self.variables["intervals_per_machines"][m][i] ) == self.cp_model.presence(batch_attribute[m][i]) ) self.variables["setup_cost_per_machine"] = setup_cost self.variables["batch_attribute"] = batch_attribute def _constraint_setup_time_direct_sequence(self): """Add setup constraints using cost-space sequence. Revolutionary approach: Create a parallel sequence in "cost space"! - Time dimension = cumulative setup cost (not real time) - Each batch is an interval at position = cumulative cost so far - Transitions use setup_costs as "setup times" in this cost space - The sequence.no_overlap with cost matrix automatically computes costs! - _same_sequence ensures time and cost sequences have identical ordering - Total setup cost = makespan of cost sequence This eliminates the O(n_attr² × n_batches) constraint explosion! """ setup_cost = {m: [] for m in range(self.problem.n_machines)} # Store dummy initial intervals for warm start self.variables["dummy_initial_intervals"] = {} for m in range(self.problem.n_machines): machine_data = self.problem.machines_data[m] intervals_per_attr = self.variables["intervals_per_machines_per_attr"][m] all_attributes = sorted(intervals_per_attr.keys()) # Standard time-based sequence for actual scheduling - store dummy interval dummy_initial = self.cp_model.interval_var(-1, 0, 1) self.variables["dummy_initial_intervals"][m] = dummy_initial time_intervals = [(dummy_initial, machine_data.initial_attribute)] time_intervals += [ (intervals_per_attr[attr][i], attr) for attr in intervals_per_attr for i in range(self.max_nb_batch) ] seq_time = self.cp_model.sequence_var( [x[0] for x in time_intervals], types=[x[1] for x in time_intervals] ) seq_time.no_overlap(self.problem.setup_times) # COST-SPACE SEQUENCE - this is the innovation! # Create intervals in cost-space (not time-space) max_total_cost = ( sum(max(row) for row in self.problem.setup_costs) * self.max_nb_batch ) # Initial interval at cost=0 cost_initial = self.cp_model.interval_var( start=0, end=0, length=0, optional=False, name=f"cost_init_m{m}" ) # Cost intervals: one per (batch, attribute) pair # These are positioned in cost-space, not time-space! cost_intervals_per_attr = {attr: [] for attr in all_attributes} for b in range(self.max_nb_batch): for attr in all_attributes: # Position in cost-space = cumulative setup cost cost_itv = self.cp_model.interval_var( start=(0, max_total_cost), length=0, # Point in cost-space end=(0, max_total_cost), optional=True, name=f"cost_m{m}_b{b}_attr{attr}", ) cost_intervals_per_attr[attr].append(cost_itv) # Link cost interval presence to actual batch-attribute self.cp_model.enforce( self.cp_model.presence(cost_itv) == self.cp_model.presence(intervals_per_attr[attr][b]) ) # Build cost-space sequence cost_sequence_intervals = [cost_initial] cost_sequence_types = [machine_data.initial_attribute] for attr in all_attributes: for b in range(self.max_nb_batch): cost_sequence_intervals.append(cost_intervals_per_attr[attr][b]) cost_sequence_types.append(attr) seq_cost = self.cp_model.sequence_var( cost_sequence_intervals, types=cost_sequence_types ) # THE MAGIC: Use setup_costs as transition matrix in COST-space! # This automatically positions intervals to satisfy: pos[i+1] = pos[i] + cost[attr[i]][attr[i+1]] seq_cost.no_overlap(self.problem.setup_costs) # Ensure time and cost sequences have the SAME ordering # This is critical - they must select the same batches in the same order self.cp_model._same_sequence(seq_time, seq_cost) # Total setup cost = end position of last batch in cost-space # Since batches are ordered and cost-space intervals have length=0, # the end of the last present interval = cumulative setup cost # # We take max over all batch-attribute cost intervals # Non-present intervals should not affect the max all_cost_ends = [] for attr in all_attributes: for b in range(self.max_nb_batch): cost_itv = cost_intervals_per_attr[attr][b] all_cost_ends.append(self.cp_model.end(cost_itv)) total_setup_cost = self.cp_model.int_var( min=0, max=max_total_cost, name=f"total_setup_cost_m{m}" ) self.cp_model.enforce(total_setup_cost == self.cp_model.max(all_cost_ends)) # Store as single-element list for compatibility with objective code setup_cost[m].append(total_setup_cost) self.variables["setup_cost_per_machine"] = setup_cost def _add_batch_count_variables(self): """Add explicit integer variables for number of batches per machine. Similar to the late job variables, this allows the solver to branch on high-level decisions (how many batches?) before fixing batch details. """ nb_batches_per_machine = {} for m in range(self.problem.n_machines): nb_batches = self.cp_model.int_var( min=0, max=self.max_nb_batch, name=f"nb_batches_m{m}" ) nb_batches_per_machine[m] = nb_batches # Link to batch presence self.cp_model.enforce( nb_batches == self.cp_model.sum( [ self.cp_model.presence( self.variables["intervals_per_machines"][m][i] ) for i in range(self.max_nb_batch) ] ) ) self.variables["nb_batches_per_machine"] = nb_batches_per_machine def _define_objectives_from_params(self): """Define objectives based on params_objective_function.""" self.variables["objectives"] = {} objective_terms = [] # Get weights from params_objective_function weights_dict = { obj_name: weight for obj_name, weight in zip( self.params_objective_function.objectives, self.params_objective_function.weights, ) } horizon = max( interval[1] for machine_data in self.problem.machines_data for interval in machine_data.availability ) # Processing time objective if "processing_time" in weights_dict: processing_time = self.cp_model.sum( [ self.cp_model.length(self.variables["intervals_per_machines"][m][i]) for m in self.variables["intervals_per_machines"] for i in range(self.max_nb_batch) ] ) self.variables["objectives"]["processing_time"] = processing_time if weights_dict["processing_time"] > 0: objective_terms.append( weights_dict["processing_time"] * processing_time ) # Setup cost objective if "setup_cost" in weights_dict and "setup_cost_per_machine" in self.variables: # For direct_sequence: setup_cost_per_machine[m] is a single total cost # For baseline/explicit_attrs: setup_cost_per_machine[m] is a list of per-batch costs setup_costs_flat = [] for m in self.variables["setup_cost_per_machine"]: cost_entry = self.variables["setup_cost_per_machine"][m] if isinstance(cost_entry, list): setup_costs_flat.extend(cost_entry) else: setup_costs_flat.append(cost_entry) setup_cost = self.cp_model.sum(setup_costs_flat) self.variables["objectives"]["setup_cost"] = setup_cost if weights_dict["setup_cost"] > 0: objective_terms.append(weights_dict["setup_cost"] * setup_cost) # Tardiness objective (number of late jobs) if "nb_late_jobs" in weights_dict: late_jobs = {} for t in range(self.problem.n_jobs): task_data = self.problem.tasks_data[t] if task_data.latest_end < float("inf"): late = self.cp_model.int_var(name=f"late_{t}") late_jobs[t] = late self.cp_model.enforce( late == ( self.cp_model.end(self.variables["interval_job"][t]) > task_data.latest_end ) ) if late_jobs: tardiness = self.cp_model.sum([late_jobs[t] for t in late_jobs]) self.variables["objectives"]["nb_late_jobs"] = tardiness self.variables["late_jobs"] = late_jobs if weights_dict["nb_late_jobs"] > 0: objective_terms.append(weights_dict["nb_late_jobs"] * tardiness) # Makespan objective (optional, for compatibility) if "makespan" in weights_dict: makespan = self.cp_model.int_var( min=max( self.problem.tasks_data[t].earliest_start for t in range(self.problem.n_jobs) ), max=horizon, name="makespan", ) self.cp_model.enforce( self.cp_model.max( [ self.cp_model.end(self.variables["interval_job"][t]) for t in self.variables["interval_job"] ] ) == makespan ) self.variables["objectives"]["makespan"] = makespan if weights_dict["makespan"] > 0: objective_terms.append(weights_dict["makespan"] * makespan) # Number of batches objective (optional) if "nb_batch" in weights_dict: nb_batch = self.cp_model.sum( [ self.cp_model.presence( self.variables["intervals_per_machines"][m][i] ) for m in self.variables["intervals_per_machines"] for i in range(self.max_nb_batch) ] ) self.variables["objectives"]["nb_batch"] = nb_batch if weights_dict["nb_batch"] > 0: objective_terms.append(weights_dict["nb_batch"] * nb_batch) # Minimize weighted sum of objectives if objective_terms: self.cp_model.minimize(self.cp_model.sum(objective_terms))
[docs] def retrieve_solution(self, result: "cp.SolutionEvent") -> OvenSchedulingSolution: """Construct a solution from OptalCP solver results. Args: result: The OptalCP solution event Returns: An OvenSchedulingSolution """ if result.solution is None: # Return empty solution if no solution found return OvenSchedulingSolution( problem=self.problem, schedule_per_machine={m: [] for m in range(self.problem.n_machines)}, ) solution = result.solution # Log objective values logger.info(f"Objective: {solution.get_objective()}") # for obj_name in self.variables["objectives"]: # obj_value = solution.get_value(self.variables["objectives"][obj_name]) # logger.info(f"{obj_name}: {obj_value}") # Extract schedule information schedule_per_machine = {m: [] for m in range(self.problem.n_machines)} batches_per_machine = { m: defaultdict(list) for m in range(self.problem.n_machines) } type_per_machine = {m: {} for m in range(self.problem.n_machines)} # Get batch timings intervals_per_machines = self.variables["intervals_per_machines"] for m in intervals_per_machines: for b in range(self.max_nb_batch): if solution.is_present(intervals_per_machines[m][b]): start = solution.get_start(intervals_per_machines[m][b]) end = solution.get_end(intervals_per_machines[m][b]) schedule_per_machine[m].append((b, start, end)) # Get job assignments to batches interval_batch_machine_for_job = self.variables[ "interval_batch_machine_for_job" ] for job_id in interval_batch_machine_for_job: for machine in interval_batch_machine_for_job[job_id]: if solution.is_present(interval_batch_machine_for_job[job_id][machine]): batch_idx = solution.get_start( interval_batch_machine_for_job[job_id][machine] ) type_per_machine[machine][batch_idx] = self.problem.tasks_data[ job_id ].attribute batches_per_machine[machine][batch_idx].append(job_id) # Build final schedule with ScheduleInfo objects final_schedule_per_machine = {} for m in range(self.problem.n_machines): final_schedule_per_machine[m] = [] # Sort batches by their position index sorted_batches = sorted(schedule_per_machine[m], key=lambda x: x[0]) for batch_idx, start_time, end_time in sorted_batches: if batch_idx in batches_per_machine[m]: tasks_in_batch = set(batches_per_machine[m][batch_idx]) if tasks_in_batch: task_attribute = type_per_machine[m][batch_idx] schedule_info = ScheduleInfo( tasks=tasks_in_batch, task_attribute=task_attribute, start_time=start_time, end_time=end_time, machine_batch_index=(m, len(final_schedule_per_machine[m])), ) final_schedule_per_machine[m].append(schedule_info) oven_solution = OvenSchedulingSolution( problem=self.problem, schedule_per_machine=final_schedule_per_machine ) # Log solution quality evaluation = self.problem.evaluate(oven_solution) if "ub" in self.problem.additional_data: gap = ( self.aggreg_from_sol(oven_solution) / self.problem.additional_data["ub"] ) logger.info(f"Gap to known upper bound: {gap:.4f}") logger.info(f"Feasible: {self.problem.satisfy(oven_solution)}") # Store current solution for warm start self.current_solution = result.solution return oven_solution
[docs] def set_warm_start(self, solution: OvenSchedulingSolution) -> None: """Set warm start from a given solution. Args: solution: The solution to use for warm start """ if self.cp_model is None: raise RuntimeError("Model must be initialized before setting warm start") # Create a CP solution object for warm start warm_start = cp.Solution() # OptalCP requires hints for ALL variables (present and absent) # Build complete warm start # Set dummy initial intervals (used in sequence constraints) if "dummy_initial_intervals" in self.variables: for m in self.variables["dummy_initial_intervals"]: # Dummy interval has fixed values: start=-1, length=1, end=0 warm_start.set_value( self.variables["dummy_initial_intervals"][m], -1, 0 ) for m in range(self.problem.n_machines): machine_batches = solution.schedule_per_machine.get(m, []) prev_attr = self.problem.machines_data[m].initial_attribute # Process present batches for batch_idx, sched_info in enumerate(machine_batches): if batch_idx < self.max_nb_batch: # Set batch interval (present) interval = self.variables["intervals_per_machines"][m][batch_idx] warm_start.set_value( interval, sched_info.start_time, sched_info.end_time ) # Set attribute interval for used attribute (present) attr = sched_info.task_attribute attr_interval = self.variables["intervals_per_machines_per_attr"][ m ][attr][batch_idx] warm_start.set_value( attr_interval, sched_info.start_time, sched_info.end_time ) # Set absent all OTHER attribute intervals for this batch for other_attr in self.variables["intervals_per_machines_per_attr"][ m ]: if other_attr != attr: warm_start.set_absent( self.variables["intervals_per_machines_per_attr"][m][ other_attr ][batch_idx] ) # Set setup cost if applicable if "setup_cost_per_machine" in self.variables: if isinstance( self.variables["setup_cost_per_machine"][m], list ): setup_cost = self.problem.setup_costs[prev_attr][attr] warm_start.set_value( self.variables["setup_cost_per_machine"][m][batch_idx], setup_cost, ) # Set batch attribute variable if using explicit_attrs if "batch_attribute" in self.variables: warm_start.set_value( self.variables["batch_attribute"][m][batch_idx], attr ) prev_attr = attr # Set absent batches beyond those used for batch_idx in range(len(machine_batches), self.max_nb_batch): warm_start.set_absent( self.variables["intervals_per_machines"][m][batch_idx] ) # Set absent ALL attribute intervals for unused batches for attr in self.variables["intervals_per_machines_per_attr"][m]: warm_start.set_absent( self.variables["intervals_per_machines_per_attr"][m][attr][ batch_idx ] ) # Set absent setup cost for unused batches if "setup_cost_per_machine" in self.variables: if isinstance(self.variables["setup_cost_per_machine"][m], list): warm_start.set_absent( self.variables["setup_cost_per_machine"][m][batch_idx] ) # Set job assignments - ALL jobs on ALL machines (present/absent) for task in range(self.problem.n_jobs): task_data = self.problem.tasks_data[task] # Find which machine this task is assigned to assigned_machine = None assigned_start = None assigned_end = None assigned_batch_idx = None for m in solution.schedule_per_machine: for batch_idx, sched_info in enumerate( solution.schedule_per_machine[m] ): if task in sched_info.tasks: assigned_machine = m assigned_start = sched_info.start_time assigned_end = sched_info.end_time assigned_batch_idx = batch_idx break if assigned_machine is not None: break # Set main job interval (always present) warm_start.set_value( self.variables["interval_job"][task], assigned_start, assigned_end ) # Set job intervals on all eligible machines for m in self.variables["interval_job_per_machine"][task]: if m == assigned_machine: # Present on assigned machine warm_start.set_value( self.variables["interval_job_per_machine"][task][m], assigned_start, assigned_end, ) else: # Absent on other machines warm_start.set_absent( self.variables["interval_job_per_machine"][task][m] ) # Set batch assignment intervals on all eligible machines for m in self.variables["interval_batch_machine_for_job"][task]: if m == assigned_machine and assigned_batch_idx < self.max_nb_batch: # Present on assigned machine warm_start.set_value( self.variables["interval_batch_machine_for_job"][task][m], assigned_batch_idx, assigned_batch_idx + 1, ) else: # Absent on other machines warm_start.set_absent( self.variables["interval_batch_machine_for_job"][task][m] ) # Set A87RandomOvenSchedulingInstance-n250-k2-a5--2212-22.47.15.daLL late job variables (must set all, not just those with constraints) if "late_jobs" in self.variables: for task in self.variables["late_jobs"]: task_data = self.problem.tasks_data[task] # Find task's end time in solution task_end = None for m in solution.schedule_per_machine: for sched_info in solution.schedule_per_machine[m]: if task in sched_info.tasks: task_end = sched_info.end_time break if task_end is not None: break if task_end is not None: is_late = 1 if task_end > task_data.latest_end else 0 warm_start.set_value(self.variables["late_jobs"][task], is_late) # Set batch count variables if "nb_batches_per_machine" in self.variables: for m in range(self.problem.n_machines): nb_batches = len(solution.schedule_per_machine.get(m, [])) warm_start.set_value( self.variables["nb_batches_per_machine"][m], nb_batches ) # Set makespan if it exists as a variable if ( "objectives" in self.variables and "makespan" in self.variables["objectives"] ): # Find maximum end time across all batches max_end = 0 for m in solution.schedule_per_machine: for sched_info in solution.schedule_per_machine[m]: max_end = max(max_end, sched_info.end_time) warm_start.set_value(self.variables["objectives"]["makespan"], max_end) # Set objective value hint objective_value = self.aggreg_from_sol(solution) warm_start.set_objective(int(objective_value)) # Store warm start solution self.warm_start_solution = warm_start self.use_warm_start = True logger.info(f"Warm start set with objective value: {objective_value}")
[docs] def set_warm_start_from_previous_run(self): """Set warm start from the previous solve run.""" if self.current_solution is not None: self.warm_start_solution = self.current_solution self.use_warm_start = True logger.info("Warm start set from previous run")