Source code for discrete_optimization.singlebatch.solvers.cpsat

import logging
from enum import Enum
from typing import Any, Optional

from ortools.sat.python.cp_model import CpModel, CpSolverSolutionCallback, LinearExpr

from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Solution,
)
from discrete_optimization.generic_tools.do_solver import WarmstartMixin
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    EnumHyperparameter,
)
from discrete_optimization.generic_tools.ortools_cpsat_tools import OrtoolsCpSatSolver
from discrete_optimization.singlebatch.problem import (
    BatchProcessingSolution,
    SingleBatchProcessingProblem,
)

logger = logging.getLogger(__name__)


[docs] class ModelingBpm(Enum): BINARY = 0 SCHEDULING = 1
[docs] class CpSatSingleBatchSolver(OrtoolsCpSatSolver, WarmstartMixin): """CP-SAT Solver for the Single Batch-Processing Machine Scheduling Problem. Args: problem: The batch processing problem instance params_objective_function: Parameters for objective function modeling: Type of CP-SAT model (BINARY or SCHEDULING) symmetry_breaking: If True, applies symmetry breaking by ordering jobs by processing time and adding constraint x_jk <= x_kk for j <= k. From Trindade et al. (2018) "Modelling and symmetry breaking in scheduling problems on batch processing machines" (default: False) """ problem: SingleBatchProcessingProblem hyperparameters = [ EnumHyperparameter( name="modeling", enum=ModelingBpm, default=ModelingBpm.BINARY ) ] def __init__( self, problem, # Assuming SingleBatchProcessingProblem params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs: Any, ): super().__init__(problem, params_objective_function, **kwargs) self.variables = {} self.symmetry_breaking = False self.job_order = None self.job_order_inv = None
[docs] def init_model(self, **args: Any) -> None: self.cp_model = CpModel() args = self.complete_with_default_hyperparameters(args) self.modeling = args["modeling"] self.symmetry_breaking = args.get("symmetry_breaking", False) self.max_batches = self.problem.nb_jobs if self.modeling == ModelingBpm.BINARY: self.init_model_binary(**args) elif self.modeling == ModelingBpm.SCHEDULING: self.init_model_scheduling(**args)
[docs] def init_model_binary(self, **args: Any) -> None: if self.symmetry_breaking: self._init_model_binary_symmetry_breaking(**args) else: self._init_model_binary_naive(**args)
def _init_model_binary_symmetry_breaking(self, **args: Any) -> None: """Symmetry breaking formulation from Trindade et al. (2018). Jobs are ordered by processing time and variables x_jk only exist for j <= k. """ problem = self.problem nb_jobs = problem.nb_jobs # Sort jobs by processing time job_indices = list(range(nb_jobs)) self.job_order_inv = sorted( job_indices, key=lambda j: (problem.jobs[j].processing_time, j) ) self.job_order = [0] * nb_jobs for sorted_idx in range(nb_jobs): orig_idx = self.job_order_inv[sorted_idx] self.job_order[orig_idx] = sorted_idx # Variables: x[j, k] for j <= k only (using sorted indices) self.variables["x"] = {} for j_sorted in range(nb_jobs): j_orig = self.job_order_inv[j_sorted] for k_sorted in range(j_sorted, nb_jobs): self.variables["x"][j_orig, k_sorted] = self.cp_model.NewBoolVar( f"x_{j_sorted}_{k_sorted}" ) # Constraint (9): Each job assigned to exactly one batch for j_sorted in range(nb_jobs): j_orig = self.job_order_inv[j_sorted] self.cp_model.AddExactlyOne( [ self.variables["x"][j_orig, k_sorted] for k_sorted in range(j_sorted, nb_jobs) ] ) # Constraint (10): Capacity constraint using x_kk as batch indicator for k_sorted in range(nb_jobs): k_orig = self.job_order_inv[k_sorted] self.cp_model.Add( LinearExpr.WeightedSum( [ self.variables["x"][self.job_order_inv[j_sorted], k_sorted] for j_sorted in range(k_sorted + 1) ], [ problem.jobs[self.job_order_inv[j_sorted]].size for j_sorted in range(k_sorted + 1) ], ) <= problem.capacity * self.variables["x"][k_orig, k_sorted] ) # Constraint (11): Symmetry breaking x_jk <= x_kk for j_sorted in range(nb_jobs): j_orig = self.job_order_inv[j_sorted] for k_sorted in range(j_sorted + 1, nb_jobs): # j < k only k_orig = self.job_order_inv[k_sorted] self.cp_model.AddImplication( self.variables["x"][j_orig, k_sorted], self.variables["x"][k_orig, k_sorted], ) # Objective (8): Minimize sum of processing times of used batches makespan = self.cp_model.NewIntVar( 0, sum(job.processing_time for job in problem.jobs), "makespan" ) self.cp_model.Add( makespan == sum( problem.jobs[self.job_order_inv[k_sorted]].processing_time * self.variables["x"][self.job_order_inv[k_sorted], k_sorted] for k_sorted in range(nb_jobs) ) ) self.variables["makespan"] = makespan self.variables["y"] = None self.variables["batch_p"] = None self.cp_model.Minimize(makespan) def _init_model_binary_naive(self, **args: Any) -> None: """Standard formulation with separate batch and allocation variables.""" problem = self.problem nb_jobs = problem.nb_jobs max_batches = self.max_batches # Variables self.variables["x"] = {} # x[j, b] = 1 if job j is in batch b for j in range(nb_jobs): for b in range(max_batches): self.variables["x"][j, b] = self.cp_model.NewBoolVar(f"x_{j}_{b}") self.variables["y"] = {} # y[b] = 1 if batch b is used for b in range(max_batches): self.variables["y"][b] = self.cp_model.NewBoolVar(f"y_{b}") max_p = max(job.processing_time for job in problem.jobs) self.variables["batch_p"] = {} # batch_p[b] = processing time of batch b for b in range(max_batches): self.variables["batch_p"][b] = self.cp_model.NewIntVar( 0, max_p, f"batch_p_{b}" ) # 1. Assignment constraint for j in range(nb_jobs): self.cp_model.AddExactlyOne( [self.variables["x"][j, b] for b in range(max_batches)] ) # 2. Capacity constraint (Linear) for b in range(max_batches): self.cp_model.Add( LinearExpr.WeightedSum( [self.variables["x"][j, b] for j in range(nb_jobs)], [problem.jobs[j].size for j in range(nb_jobs)], ) <= problem.capacity * self.variables["y"][b] ) # 3. Batch processing time definition for b in range(max_batches): for j in range(nb_jobs): self.cp_model.Add( self.variables["batch_p"][b] >= problem.jobs[j].processing_time ).OnlyEnforceIf(self.variables["x"][j, b]) self.cp_model.AddImplication( self.variables["x"][j, b], self.variables["y"][b] ) # 4. Symmetry Breaking for b in range(max_batches - 1): self.cp_model.AddImplication( self.variables["y"][b + 1], self.variables["y"][b] ) # Objective makespan = self.cp_model.NewIntVar( 0, sum(job.processing_time for job in problem.jobs), "makespan" ) self.cp_model.Add( makespan == sum(self.variables["batch_p"][b] for b in range(max_batches)) ) self.variables["makespan"] = makespan self.cp_model.Minimize(makespan)
[docs] def init_model_scheduling(self, **args: Any) -> None: problem = self.problem nb_jobs = problem.nb_jobs max_batches = self.max_batches # Variables self.variables["batch_idx"] = {} # The batch index assigned to job j intervals = {} for j in range(nb_jobs): self.variables["batch_idx"][j] = self.cp_model.NewIntVar( 0, max_batches - 1, f"batch_idx_{j}" ) intervals[j] = self.cp_model.NewFixedSizeIntervalVar( start=self.variables["batch_idx"][j], size=1, name=f"interval_{j}" ) # 1. Capacity constraint (Cumulative Scheduling) # We treat "batch index" as time. The cumulative sum at any "time" cannot exceed machine capacity. self.cp_model.AddCumulative( [intervals[j] for j in range(nb_jobs)], [problem.jobs[j].size for j in range(nb_jobs)], problem.capacity, ) max_p = max(job.processing_time for job in problem.jobs) self.variables["batch_p"] = {} self.variables["is_in_batch"] = {} # 2. Extracting Batch Processing Time # We bridge the integer batch_idx[j] to boolean logic to determine the max duration per batch for b in range(max_batches): self.variables["batch_p"][b] = self.cp_model.NewIntVar( 0, max_p, f"batch_p_{b}" ) for j in range(nb_jobs): is_in = self.cp_model.NewBoolVar(f"is_in_{j}_{b}") self.variables["is_in_batch"][j, b] = is_in # Link boolean to integer batch index self.cp_model.Add(self.variables["batch_idx"][j] == b).OnlyEnforceIf( is_in ) self.cp_model.Add(self.variables["batch_idx"][j] != b).OnlyEnforceIf( is_in.Not() ) # Set batch duration self.cp_model.Add( self.variables["batch_p"][b] >= problem.jobs[j].processing_time ).OnlyEnforceIf(is_in) # Objective makespan = self.cp_model.NewIntVar( 0, sum(job.processing_time for job in problem.jobs), "makespan" ) self.cp_model.Add( makespan == sum(self.variables["batch_p"][b] for b in range(max_batches)) ) self.variables["makespan"] = makespan self.cp_model.Minimize(makespan)
[docs] def set_warm_start(self, solution: BatchProcessingSolution) -> None: self.cp_model.clear_hints() if self.modeling == ModelingBpm.BINARY: if self.symmetry_breaking: # Symmetry breaking formulation for j in range(self.problem.nb_jobs): alloc = solution.job_to_batch[j] if (j, alloc) in self.variables["x"]: self.cp_model.add_hint(self.variables["x"][j, alloc], 1) else: # Naive formulation for j in range(self.problem.nb_jobs): alloc = solution.job_to_batch[j] self.cp_model.add_hint(self.variables["x"][j, alloc], 1) batch = set(solution.job_to_batch) for b in self.variables["y"]: self.cp_model.add_hint(self.variables["y"][b], b in batch) elif self.modeling == ModelingBpm.SCHEDULING: for j in range(self.problem.nb_jobs): alloc = solution.job_to_batch[j] # Hint the batch index for job j self.cp_model.add_hint(self.variables["batch_idx"][j], alloc) # Hint the bridge booleans for b in range(self.max_batches): self.cp_model.add_hint( self.variables["is_in_batch"][j, b], 1 if alloc == b else 0 )
[docs] def retrieve_solution(self, cpsolvercb: CpSolverSolutionCallback) -> Solution: """Constructs a DO solution from the CP-SAT solver's internal state.""" logger.info( f"Obj={cpsolvercb.ObjectiveValue()}, Bound={cpsolvercb.BestObjectiveBound()}" ) problem = self.problem nb_jobs = problem.nb_jobs job_to_batch = [-1] * nb_jobs if self.modeling == ModelingBpm.BINARY: if self.symmetry_breaking: # Symmetry breaking: variables are (j_orig, k_sorted) for (j_orig, k_sorted), var in self.variables["x"].items(): if cpsolvercb.Value(var): job_to_batch[j_orig] = k_sorted else: # Naive formulation for j in range(nb_jobs): for b in range(self.max_batches): if cpsolvercb.Value(self.variables["x"][j, b]): job_to_batch[j] = b break elif self.modeling == ModelingBpm.SCHEDULING: for j in range(nb_jobs): job_to_batch[j] = cpsolvercb.Value(self.variables["batch_idx"][j]) return BatchProcessingSolution(problem=problem, job_to_batch=job_to_batch)