Source code for discrete_optimization.rcpsp.solvers.lns_lp

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
import random
from collections.abc import Iterable
from enum import Enum
from typing import Any, Union

from discrete_optimization.generic_rcpsp_tools.mutation import RcpspMutation
from discrete_optimization.generic_tools.cp_tools import ParametersCp
from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    build_evaluate_function_aggregated,
    get_default_objective_setup,
)
from discrete_optimization.generic_tools.lns_mip import (
    GurobiConstraintHandler,
    InitialSolution,
    OrtoolsMathOptConstraintHandler,
)
from discrete_optimization.generic_tools.lns_tools import ConstraintHandler
from discrete_optimization.generic_tools.ls.local_search import (
    ModeMutation,
    RestartHandlerLimit,
)
from discrete_optimization.generic_tools.ls.simulated_annealing import (
    SimulatedAnnealing,
    TemperatureSchedulingFactor,
)
from discrete_optimization.generic_tools.mutations.mutation_portfolio import (
    create_mutations_portfolio_from_problem,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.rcpsp.problem import RcpspProblem
from discrete_optimization.rcpsp.solvers.cp_mzn import CpMultimodeRcpspSolver
from discrete_optimization.rcpsp.solvers.lp import (
    GurobiMultimodeRcpspSolver,
    MathOptMultimodeRcpspSolver,
    MathOptRcpspSolver,
)
from discrete_optimization.rcpsp.solvers.pile import (
    GreedyChoice,
    PileCalendarRcpspSolver,
    PileRcpspSolver,
)

logger = logging.getLogger(__name__)


[docs] class InitialRcpspMethod(Enum): DUMMY = 0 PILE = 1 PILE_CALENDAR = 2 LS = 3 GA = 4 CP = 5
[docs] class InitialRcpspSolution(InitialSolution): def __init__( self, problem: RcpspProblem, params_objective_function: ParamsObjectiveFunction = None, initial_method: InitialRcpspMethod = InitialRcpspMethod.PILE, ): self.problem = problem self.params_objective_function = params_objective_function if self.params_objective_function is None: self.params_objective_function = get_default_objective_setup( problem=self.problem ) self.aggreg, _ = build_evaluate_function_aggregated( problem=self.problem, params_objective_function=self.params_objective_function, ) self.initial_method = initial_method
[docs] def get_starting_solution(self) -> ResultStorage: if self.initial_method == InitialRcpspMethod.PILE: logger.info("Compute greedy") greedy_solver = PileRcpspSolver(self.problem) store_solution = greedy_solver.solve( greedy_choice=GreedyChoice.MOST_SUCCESSORS ) if self.initial_method == InitialRcpspMethod.PILE_CALENDAR: logger.info("Compute greedy") greedy_solver = PileCalendarRcpspSolver(self.problem) store_solution = greedy_solver.solve( greedy_choice=GreedyChoice.MOST_SUCCESSORS ) elif self.initial_method == InitialRcpspMethod.DUMMY: logger.info("Compute dummy") solution = self.problem.get_dummy_solution() fit = self.aggreg(solution) store_solution = ResultStorage( mode_optim=self.params_objective_function.sense_function, list_solution_fits=[(solution, fit)], ) elif self.initial_method == InitialRcpspMethod.CP: solver = CpMultimodeRcpspSolver( problem=self.problem, params_objective_function=self.params_objective_function, ) store_solution = solver.solve(parameters_cp=ParametersCp.default()) elif self.initial_method == InitialRcpspMethod.LS: dummy = self.problem.get_dummy_solution() mixed_mutation = create_mutations_portfolio_from_problem( problem=self.problem, selected_mutations={RcpspMutation} ) res = RestartHandlerLimit(500) sa = SimulatedAnnealing( problem=self.problem, mutator=mixed_mutation, restart_handler=res, temperature_handler=TemperatureSchedulingFactor(2, res, 0.9999), mode_mutation=ModeMutation.MUTATE, params_objective_function=self.params_objective_function, store_solution=True, ) store_solution = sa.solve(initial_variable=dummy, nb_iteration_max=10000) return store_solution
[docs] class MathoptFixStartTimeRcpspConstraintHandler(OrtoolsMathOptConstraintHandler): def __init__(self, problem: RcpspProblem, fraction_fix_start_time: float = 0.9): self.problem = problem self.fraction_fix_start_time = fraction_fix_start_time
[docs] def adding_constraint_from_results_store( self, solver: MathOptRcpspSolver, result_storage: ResultStorage, result_storage_last_iteration: ResultStorage, **kwargs: Any, ) -> Iterable[Any]: """Add constraints to the internal model of a solver based on previous solutions Args: solver: solver whose internal model is updated result_storage: all results so far result_storage_last_iteration: results from last LNS iteration only **kwargs: Returns: list of added constraints """ nb_jobs = self.problem.n_jobs + 2 lns_constraints = [] current_solution = self.extract_best_solution_from_last_iteration( result_storage=result_storage, result_storage_last_iteration=result_storage_last_iteration, ) # Starting point solver.set_warm_start(current_solution) # Fix start time for a subset of task. jobs_to_fix = set( random.sample( list(current_solution.rcpsp_schedule), int(self.fraction_fix_start_time * nb_jobs), ) ) for job_to_fix in jobs_to_fix: for t in solver.index_time: if current_solution.rcpsp_schedule[job_to_fix]["start_time"] == t: lns_constraints.append( solver.add_linear_constraint( solver.x[solver.index_in_var[job_to_fix]][t] == 1 ) ) else: lns_constraints.append( solver.add_linear_constraint( solver.x[solver.index_in_var[job_to_fix]][t] == 0 ) ) return lns_constraints
[docs] class MathoptStartTimeIntervalRcpspConstraintHandler(OrtoolsMathOptConstraintHandler): def __init__( self, problem: RcpspProblem, fraction_to_fix: float = 0.9, minus_delta: int = 2, plus_delta: int = 2, ): self.problem = problem self.fraction_to_fix = fraction_to_fix self.minus_delta = minus_delta self.plus_delta = plus_delta
[docs] def adding_constraint_from_results_store( self, solver: MathOptRcpspSolver, result_storage: ResultStorage, result_storage_last_iteration: ResultStorage, **kwargs: Any, ) -> Iterable[Any]: """Add constraints to the internal model of a solver based on previous solutions Args: solver: solver whose internal model is updated result_storage: all results so far result_storage_last_iteration: results from last LNS iteration only **kwargs: Returns: list of added constraints """ lns_constraints = [] current_solution = self.extract_best_solution_from_last_iteration( result_storage=result_storage, result_storage_last_iteration=result_storage_last_iteration, ) # Starting point solver.set_warm_start(current_solution) # Avoid fixing last jobs to allow makespan reduction max_time = max( [ current_solution.rcpsp_schedule[x]["end_time"] for x in current_solution.rcpsp_schedule ] ) last_jobs = [ x for x in current_solution.rcpsp_schedule if current_solution.rcpsp_schedule[x]["end_time"] >= max_time - 5 ] nb_jobs = self.problem.n_jobs jobs_to_fix = set( random.sample( list(current_solution.rcpsp_schedule), int(self.fraction_to_fix * nb_jobs), ) ) for lj in last_jobs: if lj in jobs_to_fix: jobs_to_fix.remove(lj) # add constraints for job in jobs_to_fix: start_time_j = current_solution.rcpsp_schedule[job]["start_time"] min_st = max(start_time_j - self.minus_delta, 0) max_st = min(start_time_j + self.plus_delta, max_time) for t in solver.index_time: if t < min_st or t > max_st: lns_constraints.append( solver.add_linear_constraint( solver.x[solver.index_in_var[job]][t] == 0 ) ) return lns_constraints
class _BaseStartTimeIntervalMultimodeRcpspConstraintHandler(ConstraintHandler): def __init__( self, problem: RcpspProblem, fraction_to_fix: float = 0.9, minus_delta: int = 2, plus_delta: int = 2, ): self.problem = problem self.fraction_to_fix = fraction_to_fix self.minus_delta = minus_delta self.plus_delta = plus_delta def adding_constraint_from_results_store( self, solver: Union[GurobiMultimodeRcpspSolver, MathOptMultimodeRcpspSolver], result_storage: ResultStorage, result_storage_last_iteration: ResultStorage, **kwargs: Any, ) -> Iterable[Any]: """Add constraints to the internal model of a solver based on previous solutions Args: solver: solver whose internal model is updated result_storage: all results so far result_storage_last_iteration: results from last LNS iteration only **kwargs: Returns: list of added constraints """ # set a good start: either solver.start_solution or result_storage.best_solution() current_solution = self.extract_best_solution_from_last_iteration( result_storage=result_storage, result_storage_last_iteration=result_storage_last_iteration, ) st = solver.start_solution if ( self.problem.evaluate(st)["makespan"] < self.problem.evaluate(current_solution)["makespan"] ): current_solution = st solver.set_warm_start(current_solution) # add constraints constraints = [] max_time = max( [ current_solution.rcpsp_schedule[x]["end_time"] for x in current_solution.rcpsp_schedule ] ) last_jobs = [ x for x in current_solution.rcpsp_schedule if current_solution.rcpsp_schedule[x]["end_time"] >= max_time - 5 ] nb_jobs = self.problem.n_jobs jobs_to_fix = set( random.sample( list(current_solution.rcpsp_schedule), int(self.fraction_to_fix * nb_jobs), ) ) for lj in last_jobs: if lj in jobs_to_fix: jobs_to_fix.remove(lj) for job in jobs_to_fix: start_time_j = current_solution.rcpsp_schedule[job]["start_time"] min_st = max(start_time_j - self.minus_delta, 0) max_st = min(start_time_j + self.plus_delta, max_time) for key in solver.variable_per_task[job]: t = key[2] if t < min_st or t > max_st: constraints.append(solver.add_linear_constraint(solver.x[key] == 0)) return constraints
[docs] class GurobiStartTimeIntervalMultimodeRcpspConstraintHandler( _BaseStartTimeIntervalMultimodeRcpspConstraintHandler, GurobiConstraintHandler ):
[docs] def adding_constraint_from_results_store( self, solver: GurobiMultimodeRcpspSolver, result_storage: ResultStorage, result_storage_last_iteration: ResultStorage, **kwargs: Any, ) -> Iterable[Any]: """Add constraints to the internal model of a solver based on previous solutions Args: solver: solver whose internal model is updated result_storage: all results so far result_storage_last_iteration: results from last LNS iteration only **kwargs: Returns: list of added constraints """ constraints = super().adding_constraint_from_results_store( solver=solver, result_storage=result_storage, result_storage_last_iteration=result_storage_last_iteration, **kwargs, ) solver.model.update() return constraints
[docs] class MathOptStartTimeIntervalMultimodeRcpspConstraintHandler(
_BaseStartTimeIntervalMultimodeRcpspConstraintHandler, OrtoolsMathOptConstraintHandler, ): ...