Source code for discrete_optimization.rcpsp.solvers.toulbar

#  Copyright (c) 2024 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
from typing import Any, Iterable, Optional

from discrete_optimization.generic_rcpsp_tools.solvers.lns_cp.neighbor_tools import (
    NeighborBuilderMix,
    NeighborBuilderSubPart,
    NeighborBuilderTimeWindow,
    NeighborRandom,
)
from discrete_optimization.generic_tools.do_problem import Solution
from discrete_optimization.generic_tools.do_solver import SolverDO, WarmstartMixin
from discrete_optimization.generic_tools.lns_tools import ConstraintHandler
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.generic_tools.toulbar_tools import (
    ToulbarSolver,
    to_lns_toulbar,
)
from discrete_optimization.rcpsp.problem import RcpspProblem, RcpspSolution
from discrete_optimization.rcpsp.solvers.rcpsp_solver import RcpspSolver

try:
    import pytoulbar2

    toulbar_available = True
except ImportError as e:
    toulbar_available = False
import logging

logger = logging.getLogger(__name__)


[docs] class ToulbarRcpspSolver(ToulbarSolver, RcpspSolver, WarmstartMixin): start_task_to_index: dict[int, int]
[docs] def retrieve_solution( self, solution_from_toulbar2: tuple[list, float, int] ) -> Solution: return RcpspSolution( problem=self.problem, rcpsp_schedule={ self.problem.tasks_list[i]: { "start_time": solution_from_toulbar2[0][i], "end_time": solution_from_toulbar2[0][i] + self.problem.mode_details[self.problem.tasks_list[i]][1][ "duration" ], } for i in range(self.problem.n_jobs) }, )
[docs] def init_model(self, **kwargs: Any) -> None: try: assert not self.problem.is_rcpsp_multimode() except AssertionError as exc: logging.exception( f"Your problem is multimode, and this toulbar model can't tackle it. " f"Please go with ToulbarMultimodeRcpspSolver instead." ) raise exc if "vns" in kwargs: model = pytoulbar2.CFN( ubinit=kwargs.get("ub", self.problem.horizon), vns=kwargs["vns"] ) else: model = pytoulbar2.CFN(ubinit=kwargs.get("ub", self.problem.horizon)) n_jobs = self.problem.n_jobs horizon = self.problem.horizon index_var = 0 self.start_task_to_index = {} for i in range(n_jobs): model.AddVariable(f"x_{i}", range(horizon + 1)) self.start_task_to_index[i] = index_var index_var += 1 # force start of first task model.AddFunction( [self.problem.index_task[self.problem.source_task]], [0 if a == 0 else horizon for a in range(horizon + 1)], ) # makespan optimisation model.AddFunction( [self.problem.index_task[self.problem.sink_task]], [a for a in range(horizon + 1)], ) # precedence constraints for task in self.problem.successors: i_task = self.problem.index_task[task] duration = self.problem.mode_details[task][1]["duration"] for succ in self.problem.successors[task]: i_succ = self.problem.index_task[succ] model.AddFunction( [i_task, i_succ], [ (0 if a + duration <= b else horizon) for a in range(horizon + 1) for b in range(horizon + 1) ], ) for resource in self.problem.resources_list: capacity = self.problem.get_max_resource_capacity(resource) tasks = [ ( self.problem.index_task[t], self.problem.mode_details[t][1].get(resource, 0), self.problem.mode_details[t][1]["duration"], ) for t in self.problem.tasks_list if self.problem.mode_details[t][1].get(resource, 0) > 0 ] for time_a in range(horizon + 1): params = "" scope = [] for (i_t, val, dur) in tasks: paramval = "" nbval = 0 for time_b in range(horizon + 1): if time_b <= time_a < time_b + dur: nbval += 1 paramval += " " + str(time_b) + " " + str(-val) if nbval > 0: params += " " + str(nbval) + paramval scope.append(i_t) if len(scope) > 0: model.CFN.wcsp.postKnapsackConstraint( scope, str(-capacity) + params, False, True ) self.model = model
[docs] def set_warm_start(self, solution: RcpspSolution) -> None: for i in range(self.problem.n_jobs): st = solution.get_start_time(self.problem.tasks_list[i]) self.model.CFN.wcsp.setBestValue(i, st)
[docs] class ToulbarMultimodeRcpspSolver(ToulbarSolver, RcpspSolver, WarmstartMixin): modes: list[list[dict]] name_var_to_index: dict[str, int] start_task_to_index: dict[int, int]
[docs] def retrieve_solution( self, solution_from_toulbar2: tuple[list, float, int] ) -> Solution: modes_dict = {t: 1 for t in self.problem.tasks_list} duration_dict = { t: self.problem.mode_details[t][1]["duration"] for t in self.problem.tasks_list } for i in range(len(self.modes)): if len(self.modes[i]) > 1: mode = solution_from_toulbar2[0][self.name_var_to_index[f"mode_{i}"]] modes_dict[self.problem.tasks_list[i]] = mode + 1 duration_dict[self.problem.tasks_list[i]] = self.modes[i][int(mode)][ "duration" ] return RcpspSolution( problem=self.problem, rcpsp_modes=[modes_dict[t] for t in self.problem.tasks_list_non_dummy], rcpsp_schedule={ self.problem.tasks_list[i]: { "start_time": solution_from_toulbar2[0][ self.name_var_to_index[f"start_{i}"] ], "end_time": solution_from_toulbar2[0][ self.name_var_to_index[f"start_{i}"] ] + duration_dict[self.problem.tasks_list[i]], } for i in range(self.problem.n_jobs) }, )
[docs] def init_model(self, **kwargs: Any) -> None: model = pytoulbar2.CFN(ubinit=self.problem.horizon) modes = [ [ self.problem.mode_details[t][m] for m in sorted(self.problem.mode_details[t]) ] for t in self.problem.tasks_list ] self.modes = modes n_jobs = self.problem.n_jobs horizon = self.problem.horizon name_var_to_index = {} self.start_task_to_index = {} index = 0 for i in range(n_jobs): if len(modes[i]) > 1: model.AddVariable(f"start_{i}", range(horizon + 1)) name_var_to_index[f"start_{i}"] = index self.start_task_to_index[i] = index index += 1 model.AddVariable(f"mode_{i}", range(len(modes[i]))) name_var_to_index[f"mode_{i}"] = index index += 1 for i_mode in range(len(modes[i])): model.AddVariable(f"start_{i}_{i_mode}", range(-1, horizon + 1)) name_var_to_index[f"start_{i}_{i_mode}"] = index index += 1 # when the mode is not selected, then the start of this task/mode is -1. model.AddFunction( [f"mode_{i}", f"start_{i}_{i_mode}"], [ 1000 if ( (j_mode == i_mode and time == -1) or (j_mode != i_mode and time != -1) ) else 0 for j_mode in range(len(modes[i])) for time in range(-1, horizon + 1) ], ) # the real start time is the sum of all starts per mode + (len(modes)-1) (counting the -1) model.AddLinearConstraint( coefs=[-1 for _ in range(len(modes[i]))] + [1], scope=[f"start_{i}_{i_mode}" for i_mode in range(len(modes[i]))] + [f"start_{i}"], operand="==", rightcoef=len(modes[i]) - 1, ) durs = [modes[i][j]["duration"] for j in range(len(modes[i]))] model.AddVariable(f"dur_{i}", range(max(durs) + 1)) name_var_to_index[f"dur_{i}"] = index index += 1 model.AddFunction( [f"mode_{i}", f"dur_{i}"], costs=[ 1000 if durs[mode] != dur else 0 for mode in range(len(modes[i])) for dur in range(max(durs) + 1) ], ) else: model.AddVariable(f"start_{i}", range(horizon + 1)) name_var_to_index[f"start_{i}"] = index self.start_task_to_index[i] = index index += 1 # force start of first task model.AddFunction( [f"start_{self.problem.index_task[self.problem.source_task]}"], [0 if a == 0 else horizon for a in range(horizon + 1)], ) # makespan optimisation model.AddFunction( [f"start_{self.problem.index_task[self.problem.sink_task]}"], [a for a in range(horizon + 1)], ) # precedence constraints for task in self.problem.successors: i_task = self.problem.index_task[task] duration = self.problem.mode_details[task][1]["duration"] # duration = max(m["duration"] for m in modes[i_task]) for succ in self.problem.successors[task]: i_succ = self.problem.index_task[succ] if f"dur_{i_task}" in name_var_to_index: model.AddLinearConstraint( [1, 1, -1], scope=[ name_var_to_index[f"start_{i_task}"], name_var_to_index[f"dur_{i_task}"], name_var_to_index[f"start_{i_succ}"], ], operand="<=", rightcoef=0, ) else: model.AddFunction( [f"start_{i_task}", f"start_{i_succ}"], [ (0 if a + duration <= b else horizon) for a in range(horizon + 1) for b in range(horizon + 1) ], ) for resource in self.problem.resources_list: capacity = self.problem.get_max_resource_capacity(resource) if resource in self.problem.non_renewable_resources: constant_cons = 0 params = "" scopes = [] for i_t in range(len(modes)): if len(modes[i_t]) == 1: if modes[i_t][0].get(resource, 0) > 0: constant_cons += modes[i_t][0][resource] else: j_s = [ (j, modes[i_t][j].get(resource, 0)) for j in range(len(modes[i_t])) ] if len(j_s) > 0: params += ( " " + str(len(j_s)) + " " + " ".join(f" {x[0]} {-x[1]}" for x in j_s) ) scopes.append(name_var_to_index[f"mode_{i_t}"]) if len(scopes) > 0: # print(params, scope) model.CFN.wcsp.postKnapsackConstraint( scopes, str(-(capacity + constant_cons)) + params, False, True ) else: tasks = [ ( i_t, i_mode, modes[i_t][i_mode].get(resource, 0), modes[i_t][i_mode]["duration"], ) for i_t in range(len(modes)) for i_mode in range(len(modes[i_t])) ] for time_a in range(horizon + 1): params = "" scope = [] for (i_t, i_mode, val, dur) in tasks: paramval = "" nbval = 0 for time_b in range(horizon + 1): if time_b <= time_a < time_b + dur: nbval += 1 paramval += " " + str(time_b) + " " + str(-val) if nbval > 0: params += " " + str(nbval) + paramval if len(modes[i_t]) > 1: scope.append(name_var_to_index[f"start_{i_t}_{i_mode}"]) else: scope.append(name_var_to_index[f"start_{i_t}"]) if len(scope) > 0: # print(params, scope) model.CFN.wcsp.postKnapsackConstraint( scope, str(-capacity) + params, False, True ) self.name_var_to_index = name_var_to_index self.model = model
[docs] def set_warm_start(self, solution: RcpspSolution) -> None: for i in range(self.problem.n_jobs): st = solution.get_start_time(self.problem.tasks_list[i]) self.model.CFN.wcsp.setBestValue(self.name_var_to_index[f"start_{i}"], st)
ToulbarRcpspSolverForLns = to_lns_toulbar(ToulbarRcpspSolver) ToulbarMultimodeRcpspSolverForLns = to_lns_toulbar(ToulbarMultimodeRcpspSolver)
[docs] class RcpspConstraintHandlerToulbar(ConstraintHandler): def __init__(self, problem: RcpspProblem, fraction_task: float = 0.5): self.fraction_task = fraction_task self.problem = problem self.graph = problem.compute_graph() neighbors_1 = NeighborBuilderSubPart( problem=self.problem, graph=self.graph, nb_cut_part=3 ) neighbors_2 = NeighborRandom( problem=self.problem, graph=self.graph, fraction_subproblem=0.3 ) neighbors_3 = NeighborBuilderTimeWindow( problem=self.problem, graph=self.graph, time_window_length=20 ) self.neighbors = NeighborBuilderMix( [neighbors_1, neighbors_2, neighbors_3], [1 / 3, 1 / 3, 1 / 3] )
[docs] def remove_constraints_from_previous_iteration( self, solver: SolverDO, previous_constraints: Iterable[Any], **kwargs: Any ) -> None: pass
[docs] def adding_constraint_from_results_store( self, solver: ToulbarRcpspSolverForLns, result_storage: ResultStorage, **kwargs: Any, ) -> Iterable[Any]: sol: RcpspSolution = result_storage.get_best_solution_fit()[0] ms = sol.get_start_time(self.problem.sink_task) tasks_1, tasks_2 = self.neighbors.find_subtasks(current_solution=sol) solver.model.CFN.timer(100) try: for t in tasks_1: st = sol.get_start_time(t) ind = self.problem.index_task[t] solver.model.AddLinearConstraint( [1], [solver.start_task_to_index[ind]], operand="<=", rightcoef=min(st + 100, ms), ) solver.model.AddLinearConstraint( [1], [solver.start_task_to_index[ind]], operand=">=", rightcoef=max(0, st - 100), ) for t in tasks_2: st = sol.get_start_time(t) ind = self.problem.index_task[t] solver.model.AddLinearConstraint( [1], [solver.start_task_to_index[ind]], operand="<=", rightcoef=min(st + 5, ms), ) solver.model.AddLinearConstraint( [1], [solver.start_task_to_index[ind]], operand=">=", rightcoef=max(0, st - 5), ) except: pass solver.set_warm_start(solution=sol)