Source code for discrete_optimization.generic_rcpsp_tools.solvers.lns_cp.solver

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

from enum import Enum
from typing import Any, Optional

from discrete_optimization.generic_rcpsp_tools.graph_tools import (
    build_graph_rcpsp_object,
)
from discrete_optimization.generic_rcpsp_tools.solvers import GenericRcpspSolver
from discrete_optimization.generic_rcpsp_tools.solvers.lns_cp.neighbor_builder import (
    OptionNeighborRandom,
    build_neighbor_mixing_cut_parts,
    build_neighbor_mixing_methods,
    build_neighbor_random,
    mix_both,
)
from discrete_optimization.generic_rcpsp_tools.solvers.lns_cp.neighbor_tools import (
    BasicConstraintBuilder,
    ConstraintHandlerScheduling,
    NeighborBuilderMix,
    NeighborBuilderSubPart,
    NeighborConstraintBreaks,
    NeighborRandomAndNeighborGraph,
    ParamsConstraintBuilder,
)
from discrete_optimization.generic_rcpsp_tools.solvers.lns_cp.solution_repair import (
    NeighborRepairProblems,
)
from discrete_optimization.generic_rcpsp_tools.solvers.ls import (
    LsGenericRcpspSolver,
    LsSolverType,
)
from discrete_optimization.generic_rcpsp_tools.typing import ANY_RCPSP
from discrete_optimization.generic_tools.cp_tools import CpSolverName, MinizincCpSolver
from discrete_optimization.generic_tools.do_problem import ParamsObjectiveFunction
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    CategoricalHyperparameter,
    EnumHyperparameter,
    FloatHyperparameter,
    IntegerHyperparameter,
    SubBrickKwargsHyperparameter,
)
from discrete_optimization.generic_tools.lns_cp import LnsCpMzn, MznConstraintHandler
from discrete_optimization.generic_tools.lns_mip import (
    InitialSolution,
    PostProcessSolution,
)
from discrete_optimization.generic_tools.lns_tools import InitialSolutionFromSolver
from discrete_optimization.rcpsp.solvers.cp_mzn import (
    CpMultimodePreemptiveRcpspSolver,
    CpMultimodeRcpspSolver,
    CpPreemptiveRcpspSolver,
    CpRcpspSolver,
)
from discrete_optimization.rcpsp.solvers.lns_cp import PostProcessLeftShift
from discrete_optimization.rcpsp.solvers.lns_cp_preemptive import PostProLeftShift
from discrete_optimization.rcpsp_multiskill.solvers.cp_mzn import (
    CpMultiskillRcpspSolver,
    CpPreemptiveMultiskillRcpspSolver,
)
from discrete_optimization.rcpsp_multiskill.solvers.lns_post_process import (
    MultiskillRcpspPostProcessSolution,
)


[docs] class ConstraintHandlerType(Enum): MIX_SUBPROBLEMS = 0 SOLUTION_REPAIR = 1
[docs] def build_default_cp_model(rcpsp_problem: ANY_RCPSP, partial_solution=None, **kwargs): if rcpsp_problem.is_preemptive(): if rcpsp_problem.is_multiskill(): solver = CpPreemptiveMultiskillRcpspSolver( problem=rcpsp_problem, cp_solver_name=kwargs.get("cp_solver_name", CpSolverName.CHUFFED), ) solver.init_model( output_type=True, model_type="ms_rcpsp_preemptive", nb_preemptive=kwargs.get("nb_preemptive", 13), max_preempted=kwargs.get("max_preempted", 100), partial_solution=partial_solution, possibly_preemptive=[ rcpsp_problem.preemptive_indicator[t] for t in rcpsp_problem.tasks_list ], unit_usage_preemptive=kwargs.get("unit_usage_preemptive", True), exact_skills_need=False, add_calendar_constraint_unit=False, add_partial_solution_hard_constraint=kwargs.get( "add_partial_solution_hard_constraint", False ), **{ x: kwargs[x] for x in kwargs if x not in [ "nb_preemptive", "max_preempted", "fake_tasks", "add_partial_solution_hard_constraint", ] }, ) return solver if rcpsp_problem.is_preemptive(): if not rcpsp_problem.is_multiskill(): if rcpsp_problem.is_rcpsp_multimode(): solver = CpMultimodePreemptiveRcpspSolver( problem=rcpsp_problem, cp_solver_name=kwargs.get("cp_solver_name", CpSolverName.CHUFFED), ) solver.init_model( output_type=True, model_type="multi-preemptive", nb_preemptive=kwargs.get("nb_preemptive", 13), max_preempted=kwargs.get("max_preempted", 100), fake_tasks=kwargs.get("fake_tasks", True), partial_solution=partial_solution, possibly_preemptive=[ rcpsp_problem.preemptive_indicator[t] for t in rcpsp_problem.tasks_list ], add_partial_solution_hard_constraint=kwargs.get( "add_partial_solution_hard_constraint", False ), **{ x: kwargs[x] for x in kwargs if x not in [ "nb_preemptive", "max_preempted", "fake_tasks", "add_partial_solution_hard_constraint", ] }, ) return solver if not rcpsp_problem.is_rcpsp_multimode(): solver = CpPreemptiveRcpspSolver( problem=rcpsp_problem, cp_solver_name=kwargs.get("cp_solver_name", CpSolverName.CHUFFED), ) solver.init_model( output_type=True, model_type="single-preemptive", nb_preemptive=kwargs.get("nb_preemptive", 13), max_preempted=kwargs.get("max_preempted", 100), fake_tasks=kwargs.get("fake_tasks", True), partial_solution=partial_solution, possibly_preemptive=[ rcpsp_problem.preemptive_indicator[t] for t in rcpsp_problem.tasks_list ], add_partial_solution_hard_constraint=kwargs.get( "add_partial_solution_hard_constraint", False ), **{ x: kwargs[x] for x in kwargs if x not in [ "nb_preemptive", "max_preempted", "fake_tasks", "add_partial_solution_hard_constraint", ] }, ) return solver if not rcpsp_problem.is_preemptive(): if rcpsp_problem.is_multiskill(): solver = CpMultiskillRcpspSolver( problem=rcpsp_problem, cp_solver_name=kwargs.get("cp_solver_name", CpSolverName.CHUFFED), one_ressource_per_task=kwargs.get( "one_ressource_per_task", rcpsp_problem.one_unit_per_task_max ), ) solver.init_model( output_type=True, exact_skills_need=False, partial_solution=partial_solution, add_partial_solution_hard_constraint=kwargs.get( "add_partial_solution_hard_constraint", False ), **{ x: kwargs[x] for x in kwargs if x not in ["add_partial_solution_hard_constraint"] }, ) return solver if not rcpsp_problem.is_multiskill(): if rcpsp_problem.is_rcpsp_multimode(): solver = CpMultimodeRcpspSolver( problem=rcpsp_problem, cp_solver_name=kwargs.get("cp_solver_name", CpSolverName.CHUFFED), ) solver.init_model( output_type=True, partial_solution=partial_solution, add_partial_solution_hard_constraint=kwargs.get( "add_partial_solution_hard_constraint", False ), **{ x: kwargs[x] for x in kwargs if x not in ["add_partial_solution_hard_constraint"] }, ) return solver if not rcpsp_problem.is_rcpsp_multimode(): solver = CpRcpspSolver( problem=rcpsp_problem, cp_solver_name=kwargs.get("cp_solver_name", CpSolverName.CHUFFED), ) solver.init_model( output_type=True, partial_solution=partial_solution, add_partial_solution_hard_constraint=kwargs.get( "add_partial_solution_hard_constraint", False ), **{ x: kwargs[x] for x in kwargs if x not in ["add_partial_solution_hard_constraint"] }, ) return solver
[docs] def build_default_postpro(rcpsp_problem: ANY_RCPSP, partial_solution=None, **kwargs): if not rcpsp_problem.is_multiskill(): if rcpsp_problem.is_preemptive(): post_process_solution = PostProLeftShift( problem=rcpsp_problem, params_objective_function=None, do_ls=kwargs.get("do_ls", False), ) return post_process_solution if partial_solution is not None: return None if not rcpsp_problem.is_preemptive(): post_process_solution = PostProcessLeftShift( rcpsp_problem=rcpsp_problem, partial_solution=partial_solution ) return post_process_solution if rcpsp_problem.is_multiskill(): if rcpsp_problem.is_preemptive(): return None else: post_process_solution = MultiskillRcpspPostProcessSolution( problem=rcpsp_problem, params_objective_function=None ) return post_process_solution
[docs] def build_default_initial_solution(rcpsp_problem: ANY_RCPSP, **kwargs): if rcpsp_problem.is_multiskill(): initial_solution_provider = InitialSolutionFromSolver( LsGenericRcpspSolver(problem=rcpsp_problem, ls_solver=LsSolverType.SA), nb_iteration_max=500, ) return initial_solution_provider if not rcpsp_problem.is_multiskill(): initial_solution_provider = InitialSolutionFromSolver( LsGenericRcpspSolver(problem=rcpsp_problem, ls_solver=LsSolverType.SA), nb_iteration_max=200, ) return initial_solution_provider
[docs] def build_constraint_handler(rcpsp_problem: ANY_RCPSP, graph, **kwargs): constraint_handler_type = kwargs.get( "constraint_handler_type", ConstraintHandlerType.MIX_SUBPROBLEMS ) constraint_handler = None if constraint_handler_type == ConstraintHandlerType.MIX_SUBPROBLEMS: n1 = NeighborBuilderSubPart( problem=rcpsp_problem, graph=graph, nb_cut_part=kwargs.get("nb_cut_part", 10), ) n2 = NeighborRandomAndNeighborGraph( problem=rcpsp_problem, graph=graph, fraction_subproblem=kwargs.get("fraction_subproblem", 0.05), ) if rcpsp_problem.includes_special_constraint(): n3 = NeighborConstraintBreaks( problem=rcpsp_problem, graph=graph, fraction_subproblem=kwargs.get("fraction_subproblem", 0.05), other_constraint_handler=n1, ) n_mix = NeighborBuilderMix( list_neighbor=[n1, n2, n3], weight_neighbor=[0.2, 0.5, 0.3] ) else: n_mix = NeighborBuilderMix( list_neighbor=[n1, n2], weight_neighbor=[0.5, 0.5] ) basic_constraint_builder = BasicConstraintBuilder( neighbor_builder=n_mix, preemptive=kwargs.get("preemptive", False), multiskill=kwargs.get("multiskill", False), ) if "params_list" in kwargs: params_list = kwargs["params_list"] else: if kwargs["params_0_kwargs"] is None: params_0 = ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=400, plus_delta_secondary=400, constraint_max_time_to_current_solution=False, ) else: params_0_kwargs = kwargs["params_0_kwargs"] params_0 = ParamsConstraintBuilder(**params_0_kwargs) if kwargs["params_1_kwargs"] is None: params_1 = ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=0, plus_delta_secondary=0, constraint_max_time_to_current_solution=False, ) else: params_1_kwargs = kwargs["params_1_kwargs"] params_1 = ParamsConstraintBuilder(**params_1_kwargs) params_list = [params_0, params_1] constraint_handler = ConstraintHandlerScheduling( problem=rcpsp_problem, basic_constraint_builder=basic_constraint_builder, params_list=params_list, use_makespan_of_subtasks=kwargs.get("use_makespan_of_subtasks", False), ) elif constraint_handler_type == ConstraintHandlerType.SOLUTION_REPAIR: if "params_list" in kwargs: params_list = kwargs["params_list"] else: if kwargs["params_0_kwargs"] is None: params_0 = ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=400, plus_delta_secondary=400, constraint_max_time_to_current_solution=False, ) else: params_0_kwargs = kwargs["params_0_kwargs"] params_0 = ParamsConstraintBuilder(**params_0_kwargs) if kwargs["params_1_kwargs"] is None: params_1 = ParamsConstraintBuilder( minus_delta_primary=5000, plus_delta_primary=5000, minus_delta_secondary=2, plus_delta_secondary=2, constraint_max_time_to_current_solution=False, ) else: params_1_kwargs = kwargs["params_1_kwargs"] params_1 = ParamsConstraintBuilder(**params_1_kwargs) params_list = [params_0, params_1] constraint_handler = NeighborRepairProblems( problem=rcpsp_problem, params_list=params_list ) return constraint_handler
[docs] def build_constraint_handler_helper(rcpsp_problem: ANY_RCPSP, graph, **kwargs): option = kwargs.get("option_neighbor_operator", 2) constraint_handler = None if option == 0: constraint_handler = build_neighbor_random( option_neighbor=kwargs.get( "option_neighbor_random", OptionNeighborRandom.MIX_ALL ), rcpsp_problem=rcpsp_problem, ) if option == 1: constraint_handler = mix_both( rcpsp_problem=rcpsp_problem, option_neighbor_random=kwargs.get( "option_neighbor_random", OptionNeighborRandom.MIX_ALL ), graph=graph, fraction_subproblem=kwargs.get("fraction_subproblem", 0.05), cut_part=kwargs.get("cut_part", 10), params_list=kwargs.get( "params_list", [ ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=400, plus_delta_secondary=400, constraint_max_time_to_current_solution=False, ), ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=0, plus_delta_secondary=0, constraint_max_time_to_current_solution=False, ), ], ), ) if option == 2: constraint_handler = build_neighbor_mixing_methods( rcpsp_problem=rcpsp_problem, graph=graph, fraction_subproblem=kwargs.get("fraction_subproblem", 0.05), cut_part=kwargs.get("cut_part", 10), params_list=kwargs.get( "params_list", [ ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=400, plus_delta_secondary=400, constraint_max_time_to_current_solution=False, ), ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=0, plus_delta_secondary=0, constraint_max_time_to_current_solution=False, ), ], ), ) if option == 3: constraint_handler = build_neighbor_mixing_cut_parts( rcpsp_problem=rcpsp_problem, graph=graph, params_list=kwargs.get( "params_list", [ ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=400, plus_delta_secondary=400, constraint_max_time_to_current_solution=False, ), ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=0, plus_delta_secondary=0, constraint_max_time_to_current_solution=False, ), ], ), ) return constraint_handler
[docs] class LnsCpMznGenericRcpspSolver(LnsCpMzn, GenericRcpspSolver): hyperparameters = [ EnumHyperparameter( name="cp_solver_name", enum=CpSolverName, default=CpSolverName.CHUFFED ), CategoricalHyperparameter(name="do_ls", choices=[True, False], default=False), EnumHyperparameter( name="constraint_handler_type", enum=ConstraintHandlerType, default=ConstraintHandlerType.MIX_SUBPROBLEMS, ), FloatHyperparameter( name="fraction_subproblem", default=0.05, low=0.0, high=1.0, depends_on=( "constraint_handler_type", [ConstraintHandlerType.MIX_SUBPROBLEMS], ), ), IntegerHyperparameter( name="nb_cut_part", default=10, low=0, high=100, depends_on=( "constraint_handler_type", [ConstraintHandlerType.MIX_SUBPROBLEMS], ), ), CategoricalHyperparameter( name="use_makespan_of_subtasks", choices=[True, False], default=False, depends_on=( "constraint_handler_type", [ConstraintHandlerType.MIX_SUBPROBLEMS], ), ), SubBrickKwargsHyperparameter( name="params_0_kwargs", subbrick_cls=ParamsConstraintBuilder ), SubBrickKwargsHyperparameter( name="params_1_kwargs", subbrick_cls=ParamsConstraintBuilder ), ] def __init__( self, problem: ANY_RCPSP, partial_solution=None, subsolver: Optional[MinizincCpSolver] = None, initial_solution_provider: Optional[InitialSolution] = None, constraint_handler: Optional[MznConstraintHandler] = None, post_process_solution: Optional[PostProcessSolution] = None, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs: Any, ): GenericRcpspSolver.__init__( self, problem=problem, params_objective_function=params_objective_function ) graph = build_graph_rcpsp_object(self.problem) kwargs = self.complete_with_default_hyperparameters(kwargs) if subsolver is None: subsolver = build_default_cp_model( rcpsp_problem=problem, partial_solution=partial_solution, **kwargs, ) self.subsolver = subsolver if constraint_handler is None: constraint_handler = build_constraint_handler( rcpsp_problem=self.problem, graph=graph, multiskill=self.problem.is_multiskill(), preemptive=self.problem.is_preemptive(), **kwargs, ) self.constraint_handler = constraint_handler if post_process_solution is None: post_process_solution = build_default_postpro( rcpsp_problem=self.problem, partial_solution=partial_solution, **kwargs, ) self.post_process_solution = post_process_solution if initial_solution_provider is None: initial_solution_provider = build_default_initial_solution( rcpsp_problem=self.problem, **kwargs, ) self.initial_solution_provider = initial_solution_provider