Source code for discrete_optimization.singlebatch.solvers.dp

import math
from typing import Any

import didppy as dp

from discrete_optimization.generic_tools.dyn_prog_tools import DpSolver
from discrete_optimization.singlebatch.problem import (
    BatchProcessingSolution,
    SingleBatchProcessingProblem,
)


[docs] class DpSingleBatchSolver(DpSolver): """Optimized Dynamic Programming Solver for Single Batch-Processing Machine using didppy.""" problem: SingleBatchProcessingProblem sorted_indices: list[int]
[docs] def init_model(self, **kwargs: Any) -> None: self.model = dp.Model() problem = self.problem n = problem.nb_jobs cap = problem.capacity # --- KEY OPTIMIZATION: PRE-SORTING --- # Sort jobs by processing time descending. # This guarantees the first job in any batch is the longest, so we pay the # cost upfront and subsequent jobs packed into that batch cost 0. self.sorted_indices = sorted( range(n), key=lambda i: problem.jobs[i].processing_time, reverse=True ) job_sizes = [problem.jobs[i].size for i in self.sorted_indices] job_ps = [problem.jobs[i].processing_time for i in self.sorted_indices] job = self.model.add_object_type(n) job_and_dummy = self.model.add_object_type(n + 1) # --- State Variables --- # 1. Unassigned jobs (indices now refer to the sorted lists) unassigned = self.model.add_set_var(target=list(range(n)), object_type=job) # 2. Remaining capacity in the currently open batch rem_cap = self.model.add_int_resource_var(target=0, less_is_better=False) # current_batch_duration = self.model.add_int_resource_var(target=0, # less_is_better=True) # 3. Last added job index in the current batch (to break permutation symmetries) # target=n acts as our dummy state representing "no job added yet" last_added = self.model.add_element_var(target=n, object_type=job_and_dummy) job_sizes_table = self.model.add_int_table(job_sizes) # --- Base Case --- self.model.add_base_case([unassigned.is_empty()]) last_added_lower_than_j = [ self.model.add_bool_state_fun(last_added < j) for j in range(n) ] all_done_before = [] for j in range(n): is_min_cond = unassigned.contains(j) for i in range(j): is_min_cond = is_min_cond & ~unassigned.contains(i) all_done_before.append(self.model.add_bool_state_fun(is_min_cond)) min_sizes_unassigned = self.model.add_int_state_fun( job_sizes_table.min(unassigned) ) # --- Transitions --- for j in range(n): # Transition A: Open a NEW batch with job j # Cost is incurred immediately because j is the longest job in this new batch t_open = dp.Transition( name=f"open_new_{j}", cost=dp.IntExpr.state_cost() + job_ps[j], effects=[ (unassigned, unassigned.remove(j)), (rem_cap, cap - job_sizes[j]), (last_added, j), # (current_batch_duration, job_ps[j]) ], preconditions=[ all_done_before[j], unassigned.contains(j), min_sizes_unassigned > rem_cap, ], ) self.model.add_transition(t_open) # Transition B: Pack job j into the EXISTING open batch # Cost is 0 because the batch's cost is dictated by its first job (which is >= job_ps[j]) # new_batch_duration = dp.max(current_batch_duration, job_ps[j]) t_pack = dp.Transition( name=f"pack_{j}", cost=dp.IntExpr.state_cost(), effects=[ (unassigned, unassigned.remove(j)), (rem_cap, rem_cap - job_sizes[j]), (last_added, j), ], preconditions=[ last_added_lower_than_j[j], unassigned.contains(j), rem_cap >= job_sizes[j], ], ) self.model.add_transition(t_pack) min_p = job_ps[-1] # The shortest processing time among all jobs # 1. L1 Bound: Volume / Capacity weight_table = self.model.add_int_table(job_sizes) self.model.add_dual_bound( math.ceil((weight_table[unassigned] - rem_cap) / cap) * min_p ) # 2. L2 Bound (Martello and Toth) weight_2_1 = [1 if job_sizes[i] > cap / 2 else 0 for i in range(n)] weight_2_2 = [0.5 if job_sizes[i] == cap / 2 else 0 for i in range(n)] weight_2_1_table = self.model.add_int_table(weight_2_1) weight_2_2_table = self.model.add_float_table(weight_2_2) self.model.add_dual_bound( ( weight_2_1_table[unassigned] + math.ceil(weight_2_2_table[unassigned]) - (rem_cap >= cap / 2).if_then_else(1, 0) ) * min_p ) # 3. L3 Bound (Finer grained capacities) weight_3 = [ 1.0 if job_sizes[i] > cap * 2 / 3 else 2 / 3 if job_sizes[i] == cap * 2 / 3 else 0.5 if job_sizes[i] > cap / 3 else 1 / 3 if job_sizes[i] == cap / 3 else 0.0 for i in range(n) ] weight_3_table = self.model.add_float_table(weight_3) self.model.add_dual_bound( ( math.ceil(weight_3_table[unassigned]) - (rem_cap >= cap / 3).if_then_else(1, 0) ) * min_p )
[docs] def retrieve_solution(self, sol: dp.Solution) -> BatchProcessingSolution: """Parse the didppy state transition trace into a valid DO Solution.""" job_to_batch = [-1] * self.problem.nb_jobs batch_id = -1 for t in sol.transitions: name = t.name if name.startswith("open_new_"): batch_id += 1 # Extract the sorted index and map it back to the original problem index sorted_j = int(name.split("_")[-1]) original_j = self.sorted_indices[sorted_j] job_to_batch[original_j] = batch_id elif name.startswith("pack_"): sorted_j = int(name.split("_")[-1]) original_j = self.sorted_indices[sorted_j] job_to_batch[original_j] = batch_id return BatchProcessingSolution(problem=self.problem, job_to_batch=job_to_batch)