Source code for discrete_optimization.ovensched.solvers.mutations

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
"""
Mutation operators for the Oven Scheduling Problem.

This module provides mutation operators that work with the VectorOvenSchedulingSolution
using the permutation encoding..
"""

from __future__ import annotations

import random
from typing import Any

import numpy as np

from discrete_optimization.generic_tools.do_mutation import (
    LocalMove,
    LocalMoveDefault,
    Mutation,
)
from discrete_optimization.generic_tools.do_problem import Solution
from discrete_optimization.ovensched import OvenSchedulingProblem
from discrete_optimization.ovensched.problem import OvenSchedulingProblem
from discrete_optimization.ovensched.solution_vector import VectorOvenSchedulingSolution


[docs] class OvenPermutationSwap(Mutation): """ Mutation operator that swaps two random positions in the permutation. This is a simple but effective mutation for permutation-based solutions. We dont use common SwapMutation to handle the cache mecanism happening in VectorOvenSchedulingSolution """ def __init__(self, problem: OvenSchedulingProblem, **kwargs: Any): super().__init__(problem=problem, **kwargs)
[docs] def mutate(self, solution: Solution) -> tuple[Solution, LocalMove]: """ Apply swap mutation to a solution. Args: solution: The solution to mutate (must be VectorOvenSchedulingSolution) Returns: Tuple of (new solution, local move object) """ solution, move = super().mutate(solution) if not isinstance(solution, VectorOvenSchedulingSolution): raise ValueError( f"Solution must be VectorOvenSchedulingSolution, got {type(solution)}" ) n = len(solution.permutation) # Choose two random distinct positions i, j = random.sample(range(n), 2) # Create new permutation with swapped elements new_perm = solution.permutation.copy() new_perm[i], new_perm[j] = new_perm[j], new_perm[i] # Create new solution new_solution = VectorOvenSchedulingSolution( problem=self.problem, permutation=new_perm, ) # Create local move (for backtracking if needed) move = LocalMoveDefault(prev_solution=solution, new_solution=new_solution) return new_solution, move
[docs] class OvenPermutationInsert(Mutation): """ Mutation operator that moves an element from one position to another. """ def __init__(self, problem: OvenSchedulingProblem, **kwargs: Any): super().__init__(problem=problem, **kwargs)
[docs] def mutate(self, solution: Solution) -> tuple[Solution, LocalMove]: """ Apply insert mutation to a solution. Args: solution: The solution to mutate (must be VectorOvenSchedulingSolution) Returns: Tuple of (new solution, local move object) """ if not isinstance(solution, VectorOvenSchedulingSolution): raise ValueError( f"Solution must be VectorOvenSchedulingSolution, got {type(solution)}" ) n = len(solution.permutation) # Choose source and target positions i, j = random.sample(range(n), 2) # Create new permutation with element moved new_perm = solution.permutation.copy() element = new_perm[i] # Remove from position i new_perm = np.delete(new_perm, i) # Insert at position j new_perm = np.insert(new_perm, j, element) # Create new solution new_solution = VectorOvenSchedulingSolution( problem=self.problem, permutation=new_perm, ) # Create local move move = LocalMoveDefault(prev_solution=solution, new_solution=new_solution) return new_solution, move
[docs] class OvenPermutationPartialShuffle(Mutation): """ Mutation operator that shuffles a random subsequence of the permutation. """ def __init__( self, problem: OvenSchedulingProblem, min_length: int = 2, max_length: int | None = None, **kwargs: Any, ): """ Initialize the partial shuffle mutation. Args: problem: The problem instance min_length: Minimum length of subsequence to shuffle max_length: Maximum length of subsequence to shuffle (default: n_jobs // 4) """ super().__init__(problem=problem, **kwargs) self.min_length = min_length self.max_length = max_length if max_length is not None else problem.n_jobs // 4
[docs] def mutate(self, solution: Solution) -> tuple[Solution, LocalMove]: """ Apply partial shuffle mutation to a solution. Args: solution: The solution to mutate (must be VectorOvenSchedulingSolution) Returns: Tuple of (new solution, local move object) """ if not isinstance(solution, VectorOvenSchedulingSolution): raise ValueError( f"Solution must be VectorOvenSchedulingSolution, got {type(solution)}" ) n = len(solution.permutation) # Choose subsequence length max_len = min(self.max_length, n) subseq_length = random.randint(self.min_length, max_len) # Choose starting position start = random.randint(0, n - subseq_length) end = start + subseq_length # Create new permutation with shuffled subsequence new_perm = solution.permutation.copy() subseq = new_perm[start:end].copy() np.random.shuffle(subseq) new_perm[start:end] = subseq # Create new solution new_solution = VectorOvenSchedulingSolution( problem=self.problem, permutation=new_perm, ) # Create local move move = LocalMoveDefault(prev_solution=solution, new_solution=new_solution) return new_solution, move
[docs] class OvenPermutationTwoOpt(Mutation): """ 2-opt mutation: reverses a random subsequence of the permutation. This is a classic neighborhood operator for permutation problems. """ def __init__(self, problem: OvenSchedulingProblem, **kwargs: Any): super().__init__(problem=problem, **kwargs)
[docs] def mutate(self, solution: Solution) -> tuple[Solution, LocalMove]: """ Apply 2-opt mutation to a solution. Args: solution: The solution to mutate (must be VectorOvenSchedulingSolution) Returns: Tuple of (new solution, local move object) """ if not isinstance(solution, VectorOvenSchedulingSolution): raise ValueError( f"Solution must be VectorOvenSchedulingSolution, got {type(solution)}" ) n = len(solution.permutation) # Choose two distinct positions i, j = sorted(random.sample(range(n), 2)) # Create new permutation with reversed subsequence new_perm = solution.permutation.copy() new_perm[i : j + 1] = new_perm[i : j + 1][::-1] # Create new solution new_solution = VectorOvenSchedulingSolution( problem=self.problem, permutation=new_perm, ) # Create local move move = LocalMoveDefault(prev_solution=solution, new_solution=new_solution) return new_solution, move
[docs] class OvenPermutationMixedMutation(Mutation): """ Mixed mutation that randomly applies one of several mutation operators. This provides diversity in the search process. """ def __init__( self, problem: OvenSchedulingProblem, swap_prob: float = 0.4, insert_prob: float = 0.3, two_opt_prob: float = 0.2, shuffle_prob: float = 0.1, **kwargs: Any, ): """ Initialize mixed mutation. Args: problem: The problem instance swap_prob: Probability of applying swap mutation insert_prob: Probability of applying insert mutation two_opt_prob: Probability of applying 2-opt mutation shuffle_prob: Probability of applying partial shuffle mutation """ super().__init__(problem=problem, **kwargs) # Normalize probabilities total = swap_prob + insert_prob + two_opt_prob + shuffle_prob self.swap_prob = swap_prob / total self.insert_prob = insert_prob / total self.two_opt_prob = two_opt_prob / total self.shuffle_prob = shuffle_prob / total # Create mutation operators self.mutations = { "swap": OvenPermutationSwap(problem), "insert": OvenPermutationInsert(problem), "two_opt": OvenPermutationTwoOpt(problem), "shuffle": OvenPermutationPartialShuffle(problem), }
[docs] def mutate(self, solution: Solution) -> tuple[Solution, LocalMove]: """ Apply a randomly selected mutation to a solution. Args: solution: The solution to mutate (must be VectorOvenSchedulingSolution) Returns: Tuple of (new solution, local move object) """ # Choose mutation based on probabilities r = random.random() if r < self.swap_prob: return self.mutations["swap"].mutate(solution) elif r < self.swap_prob + self.insert_prob: return self.mutations["insert"].mutate(solution) elif r < self.swap_prob + self.insert_prob + self.two_opt_prob: return self.mutations["two_opt"].mutate(solution) else: return self.mutations["shuffle"].mutate(solution)
[docs] class SwapBatchesMutation(Mutation): """ Swap Batches (SB) mutation from the paper. Swaps two batches on the same machine by reordering their tasks in the permutation. This changes the processing order of batches on a machine. """ problem: OvenSchedulingProblem def __init__(self, problem: OvenSchedulingProblem, **kwargs: Any): super().__init__(problem=problem, **kwargs)
[docs] def mutate(self, solution: Solution) -> tuple[Solution, LocalMove]: """ Swap two batches on the same machine. Strategy: 1. Decode permutation to identify batches 2. Choose a machine with at least 2 batches 3. Pick two batches to swap 4. Reorder permutation to swap these batches """ if not isinstance(solution, VectorOvenSchedulingSolution): raise ValueError( f"Expected VectorOvenSchedulingSolution, got {type(solution)}" ) # Decode to get batch structure schedule = solution.get_schedule() # Find machines with at least 2 batches machines_with_batches = [ m for m in range(self.problem.n_machines) if len(schedule.schedule_per_machine[m]) >= 2 ] if not machines_with_batches: # No batches to swap, return unchanged return solution, LocalMoveDefault(solution, solution) # Choose random machine machine = random.choice(machines_with_batches) batches = schedule.schedule_per_machine[machine] # Choose two random batches to swap i, j = sorted(random.sample(range(len(batches)), 2)) # Get tasks in each batch batch_i_tasks = list(batches[i].tasks) batch_j_tasks = list(batches[j].tasks) # Create new permutation by swapping batch task groups new_perm = solution.permutation.copy() # Find positions of tasks in permutation batch_i_positions = [np.where(new_perm == task)[0][0] for task in batch_i_tasks] batch_j_positions = [np.where(new_perm == task)[0][0] for task in batch_j_tasks] # Sort positions to maintain relative order within batches batch_i_positions.sort() batch_j_positions.sort() # Swap the task groups in the permutation # Extract tasks batch_i_extracted = [new_perm[pos] for pos in batch_i_positions] batch_j_extracted = [new_perm[pos] for pos in batch_j_positions] # Place batch_j tasks where batch_i was for idx, pos in enumerate(batch_i_positions): if idx < len(batch_j_extracted): new_perm[pos] = batch_j_extracted[idx] # Place batch_i tasks where batch_j was for idx, pos in enumerate(batch_j_positions): if idx < len(batch_i_extracted): new_perm[pos] = batch_i_extracted[idx] # Create new solution new_solution = VectorOvenSchedulingSolution( problem=self.problem, permutation=new_perm, ) move = LocalMoveDefault(prev_solution=solution, new_solution=new_solution) return new_solution, move
[docs] class MoveJobToExistingBatchMutation(Mutation): """ Moves a job closer to an existing compatible batch in the permutation. This encourages the decoder to batch them together. """ problem: OvenSchedulingProblem def __init__(self, problem: OvenSchedulingProblem, **kwargs: Any): super().__init__(problem=problem, **kwargs)
[docs] def mutate(self, solution: Solution) -> tuple[Solution, LocalMove]: """ Move a job close to a compatible batch. Strategy: 1. Decode to identify batches 2. Pick a random batch 3. Find a job compatible with this batch (same attribute) 4. Move job close to batch tasks in permutation """ if not isinstance(solution, VectorOvenSchedulingSolution): raise ValueError( f"Expected VectorOvenSchedulingSolution, got {type(solution)}" ) # Decode to get batch structure schedule = solution.get_schedule() # Collect all batches all_batches = [] for m in range(self.problem.n_machines): for batch in schedule.schedule_per_machine[m]: all_batches.append((m, batch)) if not all_batches: return solution, LocalMoveDefault(solution, solution) # Choose random batch machine, target_batch = random.choice(all_batches) target_attr = target_batch.task_attribute target_tasks = list(target_batch.tasks) # Find jobs with same attribute not in this batch compatible_jobs = [ job for job in range(self.problem.n_jobs) if self.problem.tasks_data[job].attribute == target_attr and job not in target_batch.tasks ] if not compatible_jobs: return solution, LocalMoveDefault(solution, solution) # Choose random compatible job job_to_move = random.choice(compatible_jobs) # Create new permutation: move job close to target batch new_perm = solution.permutation.copy() # Find position of job and position of first task in target batch job_pos = np.where(new_perm == job_to_move)[0][0] target_task = target_tasks[0] target_pos = np.where(new_perm == target_task)[0][0] # Remove job from current position new_perm = np.delete(new_perm, job_pos) # Insert near target batch (right after target_task) # Adjust target_pos if job was before it if job_pos < target_pos: target_pos -= 1 new_perm = np.insert(new_perm, target_pos + 1, job_to_move) # Create new solution new_solution = VectorOvenSchedulingSolution( problem=self.problem, permutation=new_perm, ) move = LocalMoveDefault(prev_solution=solution, new_solution=new_solution) return new_solution, move
[docs] class MoveJobToNewBatchMutation(Mutation): """ Move Job to New Batch Moves a job away from its current batch neighbors to encourage creating a new batch. """ problem: OvenSchedulingProblem def __init__(self, problem: OvenSchedulingProblem, **kwargs: Any): super().__init__(problem=problem, **kwargs)
[docs] def mutate(self, solution: Solution) -> tuple[Solution, LocalMove]: """ Move a job away from its batch to create a new batch. Strategy: 1. Decode to identify batches 2. Pick a batch with at least 2 jobs 3. Move one job far from its batch neighbors """ if not isinstance(solution, VectorOvenSchedulingSolution): raise ValueError( f"Expected VectorOvenSchedulingSolution, got {type(solution)}" ) # Decode to get batch structure schedule = solution.get_schedule() # Find batches with at least 2 jobs splittable_batches = [] for m in range(self.problem.n_machines): for batch in schedule.schedule_per_machine[m]: if len(batch.tasks) >= 2: splittable_batches.append((m, batch)) if not splittable_batches: return solution, LocalMoveDefault(solution, solution) # Choose random batch to split machine, source_batch = random.choice(splittable_batches) batch_tasks = list(source_batch.tasks) # Choose random job to move away job_to_move = random.choice(batch_tasks) # Create new permutation: move job to a random distant position new_perm = solution.permutation.copy() # Find current position job_pos = np.where(new_perm == job_to_move)[0][0] # Remove job new_perm = np.delete(new_perm, job_pos) # Insert at random position (prefer far from current position) n = len(new_perm) # Choose from positions at least n/4 away min_distance = max(1, n // 4) far_positions = [pos for pos in range(n) if abs(pos - job_pos) >= min_distance] if far_positions: new_pos = random.choice(far_positions) else: new_pos = random.randint(0, n) new_perm = np.insert(new_perm, new_pos, job_to_move) # Create new solution new_solution = VectorOvenSchedulingSolution( problem=self.problem, permutation=new_perm, ) move = LocalMoveDefault(prev_solution=solution, new_solution=new_solution) return new_solution, move
[docs] class InvertBatchesOrderMutation(Mutation): """ Invert Batches Order Reverses the order of a sequence of batches on a machine. """ problem: OvenSchedulingProblem def __init__(self, problem: OvenSchedulingProblem, **kwargs: Any): super().__init__(problem=problem, **kwargs)
[docs] def mutate(self, solution: Solution) -> tuple[Solution, LocalMove]: """ Invert order of consecutive batches. Strategy: 1. Decode to identify batches 2. Choose machine and range of batches 3. Reverse their order in the permutation """ if not isinstance(solution, VectorOvenSchedulingSolution): raise ValueError( f"Expected VectorOvenSchedulingSolution, got {type(solution)}" ) # Decode to get batch structure schedule = solution.get_schedule() # Find machines with at least 2 batches machines_with_batches = [ m for m in range(self.problem.n_machines) if len(schedule.schedule_per_machine[m]) >= 2 ] if not machines_with_batches: return solution, LocalMoveDefault(solution, solution) # Choose random machine machine = random.choice(machines_with_batches) batches = schedule.schedule_per_machine[machine] # Choose range of batches to invert num_batches = len(batches) if num_batches < 2: return solution, LocalMoveDefault(solution, solution) # Choose start and end positions i, j = sorted(random.sample(range(num_batches), 2)) # Get all tasks in the batches to invert tasks_to_invert = [] for batch_idx in range(i, j + 1): tasks_to_invert.extend(list(batches[batch_idx].tasks)) # Find positions of these tasks in permutation new_perm = solution.permutation.copy() task_positions = [] for task in tasks_to_invert: pos = np.where(new_perm == task)[0][0] task_positions.append((pos, task)) # Sort by position task_positions.sort() # Reverse the tasks reversed_tasks = [task for _, task in reversed(task_positions)] # Place reversed tasks back for idx, (pos, _) in enumerate(task_positions): new_perm[pos] = reversed_tasks[idx] # Create new solution new_solution = VectorOvenSchedulingSolution( problem=self.problem, permutation=new_perm, ) move = LocalMoveDefault(prev_solution=solution, new_solution=new_solution) return new_solution, move
[docs] class MoveJobsToNewBatchMutation(Mutation): """ Moves multiple jobs from one batch to create a new batch elsewhere. """ def __init__( self, problem: OvenSchedulingProblem, min_jobs_to_move: int = 2, max_jobs_to_move: int = 5, **kwargs: Any, ): super().__init__(problem=problem, **kwargs) self.min_jobs_to_move = min_jobs_to_move self.max_jobs_to_move = max_jobs_to_move
[docs] def mutate(self, solution: Solution) -> tuple[Solution, LocalMove]: """ Move multiple jobs together to form a new batch. Strategy: 1. Find a batch with enough jobs 2. Select subset of jobs (with same attribute) 3. Move them together to a new position """ if not isinstance(solution, VectorOvenSchedulingSolution): raise ValueError( f"Expected VectorOvenSchedulingSolution, got {type(solution)}" ) # Decode to get batch structure schedule = solution.get_schedule() # Find batches with enough jobs suitable_batches = [] for m in range(self.problem.n_machines): for batch in schedule.schedule_per_machine[m]: if len(batch.tasks) >= self.min_jobs_to_move: suitable_batches.append((m, batch)) if not suitable_batches: return solution, LocalMoveDefault(solution, solution) # Choose random batch machine, source_batch = random.choice(suitable_batches) batch_tasks = list(source_batch.tasks) # Determine how many jobs to move num_to_move = random.randint( self.min_jobs_to_move, min(self.max_jobs_to_move, len(batch_tasks)) ) # Choose random jobs to move jobs_to_move = random.sample(batch_tasks, num_to_move) # Create new permutation new_perm = solution.permutation.copy() # Find positions of jobs to move job_positions = [] for job in jobs_to_move: pos = np.where(new_perm == job)[0][0] job_positions.append((pos, job)) # Sort by position job_positions.sort() # Remove jobs from current positions (in reverse order to maintain indices) for pos, job in reversed(job_positions): new_perm = np.delete(new_perm, pos) # Insert jobs together at new position n = len(new_perm) new_pos = random.randint(0, n) for job in jobs_to_move: new_perm = np.insert(new_perm, new_pos, job) new_pos += 1 # Keep them together # Create new solution new_solution = VectorOvenSchedulingSolution( problem=self.problem, permutation=new_perm, ) move = LocalMoveDefault(prev_solution=solution, new_solution=new_solution) return new_solution, move
[docs] class ScheduleAwareMixedMutation(Mutation): """ Mixed mutation combining all schedule-aware operators. Randomly applies one of the schedule-aware mutations from the paper. """ def __init__( self, problem: OvenSchedulingProblem, swap_batches_prob: float = 0.25, mjeb_prob: float = 0.25, mjnb_prob: float = 0.20, invert_prob: float = 0.15, move_jobs_prob: float = 0.15, **kwargs: Any, ): super().__init__(problem=problem, **kwargs) # Normalize probabilities total = swap_batches_prob + mjeb_prob + mjnb_prob + invert_prob + move_jobs_prob self.swap_batches_prob = swap_batches_prob / total self.mjeb_prob = mjeb_prob / total self.mjnb_prob = mjnb_prob / total self.invert_prob = invert_prob / total self.move_jobs_prob = move_jobs_prob / total # Create mutation operators self.mutations = { "swap_batches": SwapBatchesMutation(problem), "mjeb": MoveJobToExistingBatchMutation(problem), "mjnb": MoveJobToNewBatchMutation(problem), "invert": InvertBatchesOrderMutation(problem), "move_jobs": MoveJobsToNewBatchMutation(problem), }
[docs] def mutate(self, solution: Solution) -> tuple[Solution, LocalMove]: """Apply randomly selected schedule-aware mutation.""" r = random.random() if r < self.swap_batches_prob: return self.mutations["swap_batches"].mutate(solution) elif r < self.swap_batches_prob + self.mjeb_prob: return self.mutations["mjeb"].mutate(solution) elif r < self.swap_batches_prob + self.mjeb_prob + self.mjnb_prob: return self.mutations["mjnb"].mutate(solution) elif ( r < self.swap_batches_prob + self.mjeb_prob + self.mjnb_prob + self.invert_prob ): return self.mutations["invert"].mutate(solution) else: return self.mutations["move_jobs"].mutate(solution)