Source code for discrete_optimization.shop.fjsp.solvers.lns_cpsat

#  Copyright (c) 2024 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import math
import random
from typing import Any, Hashable, Iterable, Optional

from ortools.sat.python.cp_model import Constraint

from discrete_optimization.generic_tasks_tools.enums import StartOrEnd
from discrete_optimization.generic_tools.lns_cp import OrtoolsCpSatConstraintHandler
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.shop.fjsp.problem import FJobShopProblem, FJobShopSolution
from discrete_optimization.shop.fjsp.solvers.cpsat import CpSatFjspSolver


[docs] class NeighborBuilderSubPart: """ Cut the schedule in different subpart in the increasing order of the schedule. """ def __init__(self, problem: FJobShopProblem, nb_cut_part: int = 10): self.problem = problem self.nb_cut_part = nb_cut_part self.current_sub_part = 0 self.keys = self.problem.tasks_list self.set_keys = set(self.keys)
[docs] def find_subtasks( self, current_solution: FJobShopSolution, subtasks: Optional[set[Hashable]] = None, ) -> tuple[set[tuple[int, int]], set[tuple[int, int]]]: nb_job_sub = math.ceil(self.problem.n_all_jobs / self.nb_cut_part) task_of_interest = sorted( self.keys, key=lambda x: current_solution.get_start_time(x) ) task_of_interest = task_of_interest[ self.current_sub_part * nb_job_sub : (self.current_sub_part + 1) * nb_job_sub ] if subtasks is None: subtasks = task_of_interest else: subtasks.update(task_of_interest) self.current_sub_part = (self.current_sub_part + 1) % self.nb_cut_part return subtasks, self.set_keys.difference(subtasks)
[docs] class FjspConstraintHandler(OrtoolsCpSatConstraintHandler): def __init__(self, problem: FJobShopProblem, fraction_segment_to_fix: float = 0.9): self.problem = problem self.fraction_segment_to_fix = fraction_segment_to_fix
[docs] def adding_constraint_from_results_store( self, solver: CpSatFjspSolver, result_storage: ResultStorage, result_storage_last_iteration: ResultStorage, **kwargs: Any, ) -> Iterable[Constraint]: """Add constraints to the internal model of a solver based on previous solutions Args: solver: solver whose internal model is updated result_storage: all results so far result_storage_last_iteration: results from last LNS iteration only **kwargs: Returns: list of added constraints """ sol: FJobShopSolution = self.extract_best_solution_from_last_iteration( result_storage=result_storage, result_storage_last_iteration=result_storage_last_iteration, ) keys = random.sample( self.problem.tasks_list, k=int(self.fraction_segment_to_fix * len(self.problem.tasks_list)), ) constraints = [] for k in keys: st, end = sol.get_start_time(k), sol.get_end_time(k) constraints.append( solver.cp_model.Add( solver.get_task_start_or_end_variable(k, StartOrEnd.START) <= st + 20 ) ) constraints.append( solver.cp_model.Add( solver.get_task_start_or_end_variable(k, StartOrEnd.START) >= st - 20 ) ) solver.cp_model.Minimize(solver._makespan) solver.set_warm_start(sol) return constraints
[docs] class NeighFjspConstraintHandler(OrtoolsCpSatConstraintHandler): def __init__( self, problem: FJobShopProblem, neighbor_builder: NeighborBuilderSubPart ): self.problem = problem self.neighbor_builder = neighbor_builder
[docs] def adding_constraint_from_results_store( self, solver: CpSatFjspSolver, result_storage: ResultStorage, result_storage_last_iteration: ResultStorage, **kwargs: Any, ) -> Iterable[Constraint]: """Add constraints to the internal model of a solver based on previous solutions Args: solver: solver whose internal model is updated result_storage: all results so far result_storage_last_iteration: results from last LNS iteration only **kwargs: Returns: list of added constraints """ sol, _ = result_storage[-1] sol: FJobShopSolution makespan = self.problem.evaluate(sol)["makespan"] keys_part, keys = self.neighbor_builder.find_subtasks(current_solution=sol) constraints = [] for k in keys: st, end = sol.get_start_time(k), sol.get_end_time(k) constraints.append( solver.cp_model.Add( solver.get_task_start_or_end_variable(k, StartOrEnd.START) <= st + 2 ) ) constraints.append( solver.cp_model.Add( solver.get_task_start_or_end_variable(k, StartOrEnd.START) >= st - 2 ) ) for key in self.problem.tasks_list: constraints.append( solver.cp_model.Add( solver.get_task_start_or_end_variable(key, StartOrEnd.END) <= makespan ) ) solver.set_warm_start(sol) return constraints