Source code for discrete_optimization.fjsp.solvers.cpsat

#  Copyright (c) 2024 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
from typing import Any

from ortools.sat.python.cp_model import CpModel, CpSolverSolutionCallback, Domain

from discrete_optimization.fjsp.problem import FJobShopProblem, FJobShopSolution
from discrete_optimization.generic_tools.do_problem import Solution
from discrete_optimization.generic_tools.do_solver import WarmstartMixin
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    CategoricalHyperparameter,
)
from discrete_optimization.generic_tools.ortools_cpsat_tools import OrtoolsCpSatSolver

logger = logging.getLogger(__name__)


[docs] class CpSatFjspSolver(OrtoolsCpSatSolver, WarmstartMixin): hyperparameters = [ CategoricalHyperparameter( name="duplicate_temporal_var", choices=[True, False], default=False ), CategoricalHyperparameter( name="add_cumulative_constraint", choices=[True, False], default=False ), ] problem: FJobShopProblem def __init__(self, problem: FJobShopProblem, **kwargs: Any): super().__init__(problem, **kwargs) self.variables = {}
[docs] def init_model(self, **args: Any) -> None: args = self.complete_with_default_hyperparameters(args) self.cp_model = CpModel() self.create_vars(**args) self.create_precedence_constraints() self.create_is_present_constraints() self.create_disjunctive_constraints(**args) max_time = args.get("max_time", self.problem.horizon) makespan = self.cp_model.NewIntVar(0, max_time, name="makespan") self.cp_model.AddMaxEquality( makespan, [ self.variables["ends"][(i, len(self.problem.list_jobs[i].sub_jobs) - 1)] for i in range(self.problem.n_jobs) ], ) self.variables["makespan"] = makespan self.cp_model.Minimize(makespan)
[docs] def retrieve_solution(self, cpsolvercb: CpSolverSolutionCallback) -> Solution: logger.info( f"Objective ={cpsolvercb.ObjectiveValue()}, bound = {cpsolvercb.BestObjectiveBound()}" ) schedule = [] for job_index in range(self.problem.n_jobs): sched_job = [] for subjob_index in range(len(self.problem.list_jobs[job_index].sub_jobs)): start = cpsolvercb.Value( self.variables["starts"][(job_index, subjob_index)] ) end = cpsolvercb.Value( self.variables["ends"][(job_index, subjob_index)] ) machine = None if len(self.problem.list_jobs[job_index].sub_jobs[subjob_index]) == 1: machine = ( self.problem.list_jobs[job_index] .sub_jobs[subjob_index][0] .machine_id ) else: for k in self.variables["keys_per_subjob"][ (job_index, subjob_index) ]: if cpsolvercb.Value(self.variables["presents"][k]): machine = self.variables["key_to_machine"][k] break sched_job.append((start, end, machine)) schedule.append(sched_job) return FJobShopSolution(problem=self.problem, schedule=schedule)
[docs] def create_vars(self, **args): duplicate_temporal_var = args["duplicate_temporal_var"] max_time = args.get("max_time", self.problem.horizon) starts_var = {} ends_var = {} durations_var = {} intervals_var = {} is_present_var = {} opt_starts_var = {} opt_intervals_var = {} keys_per_machines = {m: set() for m in range(self.problem.n_machines)} keys_per_subjob = {} key_to_machine = {} for i in range(self.problem.n_jobs): lb = 0 # bit more advanced lb computation than just putting 0 for sub_i in range(len(self.problem.list_jobs[i].sub_jobs)): options = self.problem.list_jobs[i].sub_jobs[sub_i] possible_durations = [opt.processing_time for opt in options] durations_var[(i, sub_i)] = self.cp_model.NewIntVarFromDomain( domain=Domain.FromValues(possible_durations), name=f"dur_{i, sub_i}" ) starts_var[(i, sub_i)] = self.cp_model.NewIntVar( lb=lb, ub=max_time, name=f"start_{i, sub_i}" ) ends_var[(i, sub_i)] = self.cp_model.NewIntVar( lb=lb + min(possible_durations), ub=max_time, name=f"end_{i, sub_i}" ) intervals_var[(i, sub_i)] = self.cp_model.NewIntervalVar( start=starts_var[(i, sub_i)], size=durations_var[(i, sub_i)], end=ends_var[(i, sub_i)], name=f"interval_{i, sub_i}", ) if len(options) == 1: # 1 alternative opt_intervals_var[(i, sub_i, 0)] = intervals_var[(i, sub_i)] keys_per_machines[options[0].machine_id].add((i, sub_i, 0)) key_to_machine[(i, sub_i, 0)] = options[0].machine_id else: for opt_i in range(len(options)): is_present_var[(i, sub_i, opt_i)] = self.cp_model.NewBoolVar( name=f"is_present_{i, sub_i, opt_i}" ) st = starts_var[(i, sub_i)] end = ends_var[(i, sub_i)] if duplicate_temporal_var: opt_starts_var[(i, sub_i, opt_i)] = self.cp_model.NewIntVar( lb=lb, ub=max_time, name=f"start_{i, sub_i, opt_i}" ) opt_intervals_var[ (i, sub_i, opt_i) ] = self.cp_model.NewOptionalFixedSizeIntervalVar( start=opt_starts_var[(i, sub_i, opt_i)], size=options[opt_i].processing_time, is_present=is_present_var[(i, sub_i, opt_i)], name=f"opt_interval_{i,sub_i,opt_i}", ) else: opt_intervals_var[ (i, sub_i, opt_i) ] = self.cp_model.NewOptionalIntervalVar( start=st, size=options[opt_i].processing_time, end=end, is_present=is_present_var[(i, sub_i, opt_i)], name=f"opt_interval_{i, sub_i, opt_i}", ) keys_per_machines[options[opt_i].machine_id].add( (i, sub_i, opt_i) ) key_to_machine[(i, sub_i, opt_i)] = options[opt_i].machine_id keys_per_subjob[(i, sub_i)] = { (i, sub_i, opt_i) for opt_i in range(len(options)) } lb += min(possible_durations) self.variables = { "starts": starts_var, "ends": ends_var, "durations": durations_var, "intervals": intervals_var, "presents": is_present_var, "opt_starts": opt_starts_var, "opt_intervals": opt_intervals_var, "keys_per_machine": keys_per_machines, "keys_per_subjob": keys_per_subjob, "key_to_machine": key_to_machine, }
[docs] def create_precedence_constraints(self): for i in range(self.problem.n_jobs): for j in range(1, len(self.problem.list_jobs[i].sub_jobs)): self.cp_model.Add( self.variables["starts"][(i, j)] >= self.variables["ends"][(i, j - 1)] )
[docs] def create_is_present_constraints(self): for subjob in self.variables["keys_per_subjob"]: if len(self.variables["keys_per_subjob"][subjob]) <= 1: continue # One way of doing the subjob should be selected ! self.cp_model.AddExactlyOne( [ self.variables["presents"][key] for key in self.variables["keys_per_subjob"][subjob] ] ) for key in self.variables["keys_per_subjob"][subjob]: self.cp_model.Add( self.variables["durations"][subjob] == self.problem.list_jobs[subjob[0]] .sub_jobs[subjob[1]][key[2]] .processing_time ).OnlyEnforceIf(self.variables["presents"][key]) if key in self.variables["opt_starts"]: self.cp_model.Add( self.variables["opt_starts"][key] == self.variables["starts"][subjob] ).OnlyEnforceIf(self.variables["presents"][key])
[docs] def create_disjunctive_constraints(self, **args): add_cumulative_constraint = args["add_cumulative_constraint"] for m in self.variables["keys_per_machine"]: self.cp_model.AddNoOverlap( [ self.variables["opt_intervals"][key] for key in self.variables["keys_per_machine"][m] ] ) if add_cumulative_constraint: self.cp_model.AddCumulative( [ self.variables["opt_intervals"][key] for key in self.variables["keys_per_machine"][m] ], [1 for _ in self.variables["keys_per_machine"][m]], 1, )
[docs] def set_warm_start(self, solution: FJobShopSolution) -> None: self.cp_model.ClearHints() for job_index in range(len(solution.schedule)): for subjob_index in range(len(solution.schedule[job_index])): options = self.problem.list_jobs[job_index].sub_jobs[subjob_index] self.cp_model.AddHint( self.variables["starts"][(job_index, subjob_index)], solution.schedule[job_index][subjob_index][0], ) self.cp_model.AddHint( self.variables["ends"][(job_index, subjob_index)], solution.schedule[job_index][subjob_index][1], ) self.cp_model.AddHint( self.variables["durations"][(job_index, subjob_index)], solution.schedule[job_index][subjob_index][1] - solution.schedule[job_index][subjob_index][0], ) if len(options) > 1: for opt_i in range(len(options)): if ( options[opt_i].machine_id == solution.schedule[job_index][subjob_index][2] ): self.cp_model.AddHint( self.variables["presents"][ (job_index, subjob_index, opt_i) ], 1, ) else: self.cp_model.AddHint( self.variables["presents"][ (job_index, subjob_index, opt_i) ], 0, )