Source code for discrete_optimization.rcpsp.solvers.lp

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

from __future__ import annotations

import logging
from collections.abc import Callable, Hashable
from itertools import product
from typing import Any, Optional, Union

from ortools.math_opt.python import mathopt

from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Solution,
)
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    CategoricalHyperparameter,
)
from discrete_optimization.generic_tools.lp_tools import (
    CplexMilpSolver,
    GurobiMilpSolver,
    MilpSolver,
    OrtoolsMathOptMilpSolver,
    VariableType,
)
from discrete_optimization.rcpsp.problem import RcpspProblem
from discrete_optimization.rcpsp.solution import PartialSolution, RcpspSolution
from discrete_optimization.rcpsp.solvers import RcpspSolver
from discrete_optimization.rcpsp.solvers.pile import (
    GreedyChoice,
    PileCalendarRcpspSolver,
    PileRcpspSolver,
)

try:
    import gurobipy
except ImportError:
    gurobi_available = False
else:
    gurobi_available = True
    import gurobipy as gurobi

try:
    import docplex
except ImportError:
    cplex_available = False
else:
    cplex_available = True
    import docplex.mp.dvar as cplex_var
    import docplex.mp.model as cplex


logger = logging.getLogger(__name__)


class _BaseLpRcpspSolver(MilpSolver, RcpspSolver):
    problem: RcpspProblem
    hyperparameters = [
        CategoricalHyperparameter(
            name="greedy_start", choices=[True, False], default=True
        ),
    ]

    def __init__(
        self,
        problem: RcpspProblem,
        params_objective_function: Optional[ParamsObjectiveFunction] = None,
        **kwargs: Any,
    ):
        if problem.is_rcpsp_multimode():
            raise ValueError("this solver is meant for single mode problems")

        super().__init__(
            problem=problem,
            params_objective_function=params_objective_function,
            **kwargs,
        )
        self.variable_decision = {}
        self.constraints_dict = {"lns": []}
        self.start_solution: Optional[RcpspSolution] = None

    def init_model(self, **kwargs):
        greedy_start = kwargs.get("greedy_start", True)
        start_solution = kwargs.get("start_solution", None)
        if start_solution is None:
            if greedy_start:
                logger.info("Computing greedy solution")
                greedy_solver = PileRcpspSolver(self.problem)
                store_solution = greedy_solver.solve(
                    greedy_choice=GreedyChoice.MOST_SUCCESSORS
                )
                self.start_solution = store_solution.get_best_solution_fit()[0]
                makespan = self.problem.evaluate(self.start_solution)["makespan"]
            else:
                logger.info("Get dummy solution")
                solution = self.problem.get_dummy_solution()
                self.start_solution = solution
                makespan = self.problem.evaluate(solution)["makespan"]
        else:
            self.start_solution = start_solution
            makespan = self.problem.evaluate(start_solution)["makespan"]
        sorted_tasks = self.problem.tasks_list
        resources = self.problem.resources_list
        p = [int(self.problem.mode_details[key][1]["duration"]) for key in sorted_tasks]
        u = [
            [self.problem.mode_details[t][1].get(r, 0) for r in resources]
            for t in sorted_tasks
        ]
        c = [self.problem.resources[r] for r in resources]
        S = []
        logger.debug(f"successors: {self.problem.successors}")
        for task in sorted_tasks:
            for suc in self.problem.successors[task]:
                S.append([task, suc])
        # we have a better self.T to limit the number of variables :
        self.index_time = range(int(makespan + 1))
        self.model = self.create_empty_model()
        self.x: list[list[VariableType]] = [
            [self.add_binary_variable(name=f"x({task},{t})") for t in self.index_time]
            for task in sorted_tasks
        ]
        self.index_in_var = {
            t: self.problem.return_index_task(task=t, offset=0) for t in sorted_tasks
        }
        # set objective
        self.set_model_objective(
            self.construct_linear_sum(
                self.x[self.index_in_var[self.problem.sink_task]][t] * t
                for t in self.index_time
            ),
            minimize=True,
        )
        self.index_task = range(self.problem.n_jobs)
        self.index_resource = range(len(resources))
        for task in self.index_task:
            self.add_linear_constraint(
                self.construct_linear_sum(self.x[task][t] for t in self.index_time) == 1
            )

        for (r, t) in product(self.index_resource, self.index_time):
            self.add_linear_constraint(
                self.construct_linear_sum(
                    u[j][r] * self.x[j][t2]
                    for j in self.index_task
                    for t2 in range(max(0, t - p[j] + 1), t + 1)
                )
                <= c[r]
            )

        for (j, s) in S:
            self.add_linear_constraint(
                self.construct_linear_sum(
                    t * self.x[self.index_in_var[s]][t]
                    - t * self.x[self.index_in_var[j]][t]
                    for t in self.index_time
                )
                >= p[self.index_in_var[j]]
            )
        p_s: Optional[PartialSolution] = kwargs.get("partial_solution", None)
        self.partial_solution = p_s
        self.constraints_partial_solutions = []
        if p_s is not None:
            constraints = []
            if p_s.start_times is not None:
                for task in p_s.start_times:
                    constraints += [
                        self.add_linear_constraint(
                            self.construct_linear_sum(
                                [
                                    j * self.x[self.index_in_var[task]][j]
                                    for j in self.index_time
                                ]
                            )
                            == p_s.start_times[task]
                        )
                    ]
                    constraints += [
                        self.add_linear_constraint(
                            self.x[self.index_in_var[task]][p_s.start_times[task]] == 1
                        )
                    ]

            if p_s.partial_permutation is not None:
                for t1, t2 in zip(
                    p_s.partial_permutation[:-1], p_s.partial_permutation[1:]
                ):
                    constraints += [
                        self.add_linear_constraint(
                            self.construct_linear_sum(
                                [
                                    t * self.x[self.index_in_var[t1]][t]
                                    - t * self.x[self.index_in_var[t2]][t]
                                    for t in self.index_time
                                ]
                            )
                            <= 0
                        )
                    ]
            if p_s.list_partial_order is not None:
                for l in p_s.list_partial_order:
                    for t1, t2 in zip(l[:-1], l[1:]):
                        constraints += [
                            self.add_linear_constraint(
                                self.construct_linear_sum(
                                    [
                                        t * self.x[self.index_in_var[t1]][t]
                                        - t * self.x[self.index_in_var[t2]][t]
                                        for t in self.index_time
                                    ]
                                )
                                <= 0
                            )
                        ]
            self.starts = {}
            for j in self.index_task:
                self.starts[j] = self.add_continuous_variable(
                    name="start_" + str(j), lb=0, ub=makespan
                )
                self.add_linear_constraint(
                    self.construct_linear_sum(t * self.x[j][t] for t in self.index_time)
                    == self.starts[j]
                )
            if p_s.start_at_end is not None:
                for i, j in p_s.start_at_end:
                    constraints += [
                        self.add_linear_constraint(
                            self.starts[self.index_in_var[j]]
                            == self.starts[self.index_in_var[i]]
                            + p[self.index_in_var[i]]
                        )
                    ]
            if p_s.start_together is not None:
                for i, j in p_s.start_together:
                    constraints += [
                        self.add_linear_constraint(
                            self.starts[self.index_in_var[j]]
                            == self.starts[self.index_in_var[i]]
                        )
                    ]
            if p_s.start_after_nunit is not None:
                for t1, t2, delta in p_s.start_after_nunit:
                    constraints += [
                        self.add_linear_constraint(
                            self.starts[self.index_in_var[t2]]
                            >= self.starts[self.index_in_var[t1]] + delta
                        )
                    ]
            if p_s.start_at_end_plus_offset is not None:
                for t1, t2, delta in p_s.start_at_end_plus_offset:
                    constraints += [
                        self.add_linear_constraint(
                            self.starts[self.index_in_var[t2]]
                            >= self.starts[self.index_in_var[t1]]
                            + delta
                            + p[self.index_in_var[t1]]
                        )
                    ]
            self.constraints_partial_solutions = constraints
        # take into account "warmstart" w/o calling set_warmstart (would cause a recursion issue here)
        self.set_warm_start(self.start_solution)

    def convert_to_variable_values(
        self, solution: RcpspSolution
    ) -> dict[VariableType, float]:
        hinted_values: dict[VariableType, float] = {}
        for j in self.index_task:
            for t in self.index_time:
                if (
                    self.start_solution.rcpsp_schedule[self.problem.tasks_list[j]][
                        "start_time"
                    ]
                    == t
                ):
                    hinted_values[self.x[j][t]] = 1
                else:
                    hinted_values[self.x[j][t]] = 0
        return hinted_values

    def set_warm_start(self, solution: Solution) -> None:
        """Make the solver warm start from the given solution.

        Implemented by OrtoolsMathOptMilpSolver or GurobiMilpSolver.

        """
        raise NotImplementedError()

    def retrieve_current_solution(
        self,
        get_var_value_for_current_solution: Callable[[Any], float],
        get_obj_value_for_current_solution: Callable[[], float],
    ) -> RcpspSolution:
        rcpsp_schedule = {}
        for (task_index, time) in product(self.index_task, self.index_time):
            value = get_var_value_for_current_solution(self.x[task_index][time])
            if value >= 0.5:
                task = self.problem.tasks_list[task_index]
                rcpsp_schedule[task] = {
                    "start_time": time,
                    "end_time": time + self.problem.mode_details[task][1]["duration"],
                }
        logger.debug(f"Size schedule : {len(rcpsp_schedule.keys())}")
        return RcpspSolution(
            problem=self.problem,
            rcpsp_schedule=rcpsp_schedule,
            rcpsp_schedule_feasible=True,
        )


[docs] class MathOptRcpspSolver(OrtoolsMathOptMilpSolver, _BaseLpRcpspSolver): hyperparameters = _BaseLpRcpspSolver.hyperparameters problem: RcpspProblem
[docs] def convert_to_variable_values( self, solution: RcpspSolution ) -> dict[mathopt.Variable, float]: return _BaseLpRcpspSolver.convert_to_variable_values(self, solution=solution)
class _BaseLpMultimodeRcpspSolver(MilpSolver, RcpspSolver): problem: RcpspProblem hyperparameters = [ CategoricalHyperparameter( name="greedy_start", choices=[True, False], default=True ) ] x: dict[tuple[Hashable, int, int], VariableType] max_horizon: Optional[int] = None partial_solution: Optional[PartialSolution] = None def __init__( self, problem: RcpspProblem, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs, ): super().__init__( problem=problem, params_objective_function=params_objective_function ) self.variable_decision = {} self.constraints_dict = {"lns": []} def init_model(self, **args): args = self.complete_with_default_hyperparameters(args) greedy_start = args["greedy_start"] start_solution = args.get("start_solution", None) max_horizon = args.get("max_horizon", None) self.max_horizon = max_horizon if start_solution is None: if greedy_start: logger.info("Computing greedy solution") if self.problem.is_varying_resource(): greedy_solver = PileCalendarRcpspSolver(self.problem) else: greedy_solver = PileRcpspSolver(self.problem) store_solution = greedy_solver.solve( greedy_choice=GreedyChoice.MOST_SUCCESSORS ) self.start_solution = store_solution.get_best_solution_fit()[0] makespan = self.problem.evaluate(self.start_solution)["makespan"] else: logger.info("Get dummy solution") solution = self.problem.get_dummy_solution() self.start_solution = solution makespan = self.problem.evaluate(solution)["makespan"] else: self.start_solution = start_solution makespan = self.problem.evaluate(start_solution)["makespan"] sorted_tasks = self.problem.tasks_list resources = self.problem.resources_list p = [ int( max( [ self.problem.mode_details[key][mode]["duration"] for mode in self.problem.mode_details[key] ] ) ) for key in sorted_tasks ] renewable = { r: self.problem.resources[r] for r in self.problem.resources if r not in self.problem.non_renewable_resources } non_renewable = { r: self.problem.resources[r] for r in self.problem.non_renewable_resources } S = [] logger.debug(f"successors: {self.problem.successors}") for task in sorted_tasks: for suc in self.problem.successors[task]: S.append([task, suc]) self.index_time = list(range(sum(p))) # we have a better self.T to limit the number of variables : if self.start_solution.rcpsp_schedule_feasible: self.index_time = list(range(int(makespan + 1))) if max_horizon is not None: self.index_time = list(range(max_horizon + 1)) self.model = self.create_empty_model(name="MRCPSP") self.x: dict[tuple[Hashable, int, int], VariableType] = {} last_task = self.problem.sink_task variable_per_task = {} keys_for_t = {} for task in sorted_tasks: if task not in variable_per_task: variable_per_task[task] = [] for mode in self.problem.mode_details[task]: for t in self.index_time: self.x[(task, mode, t)] = self.add_binary_variable( name=f"x({task},{mode}, {t})", ) for tt in range( t, t + self.problem.mode_details[task][mode]["duration"] ): if tt not in keys_for_t: keys_for_t[tt] = set() keys_for_t[tt].add((task, mode, t)) variable_per_task[task] += [(task, mode, t)] self.set_model_objective( self.construct_linear_sum( self.x[key] * key[2] for key in variable_per_task[last_task] ), minimize=True, ) for j in variable_per_task: self.add_linear_constraint( self.construct_linear_sum(self.x[key] for key in variable_per_task[j]) == 1 ) if self.problem.is_varying_resource(): renewable_quantity = {r: renewable[r] for r in renewable} else: renewable_quantity = { r: [renewable[r]] * len(self.index_time) for r in renewable } if self.problem.is_varying_resource(): non_renewable_quantity = {r: non_renewable[r] for r in non_renewable} else: non_renewable_quantity = { r: [non_renewable[r]] * len(self.index_time) for r in non_renewable } for (r, t) in product(renewable, self.index_time): self.add_linear_constraint( self.construct_linear_sum( int(self.problem.mode_details[key[0]][key[1]][r]) * self.x[key] for key in keys_for_t[t] ) <= renewable_quantity[r][t] ) for r in non_renewable: self.add_linear_constraint( self.construct_linear_sum( int(self.problem.mode_details[key[0]][key[1]][r]) * self.x[key] for key in self.x ) <= non_renewable_quantity[r][0] ) durations = { j: self.add_integer_variable(name="duration_" + str(j)) for j in variable_per_task } self.durations = durations self.variable_per_task = variable_per_task for j in variable_per_task: self.add_linear_constraint( self.construct_linear_sum( self.problem.mode_details[key[0]][key[1]]["duration"] * self.x[key] for key in variable_per_task[j] ) == durations[j] ) for (j, s) in S: self.add_linear_constraint( self.construct_linear_sum( key[2] * self.x[key] for key in variable_per_task[s] ) - self.construct_linear_sum( key[2] * self.x[key] for key in variable_per_task[j] ) >= durations[j] ) self.starts = {} for task in sorted_tasks: self.starts[task] = self.add_integer_variable( name=f"start({task})", lb=0, ub=self.index_time[-1], ) self.add_linear_constraint( self.construct_linear_sum( [self.x[key] * key[2] for key in variable_per_task[task]] ) == self.starts[task] ) p_s: Optional[PartialSolution] = args.get("partial_solution", None) self.partial_solution = p_s self.constraints_partial_solutions = [] if p_s is not None: constraints = [] if p_s.start_times is not None: constraints = [ self.add_linear_constraint( self.construct_linear_sum( self.x[k] for k in self.variable_per_task[task] if k[2] == p_s.start_times[task] ) == 1 ) for task in p_s.start_times ] if p_s.partial_permutation is not None: for t1, t2 in zip( p_s.partial_permutation[:-1], p_s.partial_permutation[1:] ): constraints += [ self.add_linear_constraint( self.construct_linear_sum( [key[2] * self.x[key] for key in variable_per_task[t1]] + [ -key[2] * self.x[key] for key in variable_per_task[t2] ] ) <= 0 ) ] if p_s.list_partial_order is not None: for l in p_s.list_partial_order: for t1, t2 in zip(l[:-1], l[1:]): constraints += [ self.add_linear_constraint( self.construct_linear_sum( [ key[2] * self.x[key] for key in variable_per_task[t1] ] + [ -key[2] * self.x[key] for key in variable_per_task[t2] ] ) <= 0 ) ] if p_s.start_at_end is not None: for i, j in p_s.start_at_end: constraints += [ self.add_linear_constraint( self.starts[j] == self.starts[i] + durations[i] ) ] if p_s.start_together is not None: for i, j in p_s.start_together: constraints += [ self.add_linear_constraint(self.starts[j] == self.starts[i]) ] if p_s.start_after_nunit is not None: for t1, t2, delta in p_s.start_after_nunit: constraints += [ self.add_linear_constraint( self.starts[t2] >= self.starts[t1] + delta ) ] if p_s.start_at_end_plus_offset is not None: for t1, t2, delta in p_s.start_at_end_plus_offset: constraints += [ self.add_linear_constraint( self.starts[t2] >= self.starts[t1] + delta + durations[t1] ) ] self.constraints_partial_solutions = constraints logger.debug( f"Partial solution constraints : {self.constraints_partial_solutions}" ) self.update_model() # take into account "warmstart" w/o calling set_warmstart (would cause a recursion issue here) self.set_warm_start(self.start_solution) def set_warm_start(self, solution: Solution) -> None: """Make the solver warm start from the given solution. Implemented by OrtoolsMathOptMilpSolver or GurobiMilpSolver. """ raise NotImplementedError() def update_model(self) -> None: """Update model (especially for gurobi). It ensures the well defintion of variables. """ ... def retrieve_current_solution( self, get_var_value_for_current_solution: Callable[[Any], float], get_obj_value_for_current_solution: Callable[[], float], ) -> RcpspSolution: rcpsp_schedule = {} modes: dict[Hashable, Union[str, int]] = {} for (task, mode, t), x in self.x.items(): value = get_var_value_for_current_solution(x) if value >= 0.5: rcpsp_schedule[task] = { "start_time": t, "end_time": t + self.problem.mode_details[task][mode]["duration"], } modes[task] = mode logger.debug(f"Size schedule : {len(rcpsp_schedule.keys())}") modes_vec = [modes[k] for k in self.problem.tasks_list_non_dummy] return RcpspSolution( problem=self.problem, rcpsp_schedule=rcpsp_schedule, rcpsp_modes=modes_vec, rcpsp_schedule_feasible=True, ) def convert_to_variable_values( self, solution: RcpspSolution ) -> dict[VariableType, float]: """Convert a solution to a mapping between model variables and their values. Will be used by set_warm_start(). """ hinted_values: dict[VariableType, float] = {} for task in self.problem.tasks_list: if task in solution.rcpsp_schedule: hinted_values[self.starts[task]] = solution.rcpsp_schedule[task][ "start_time" ] modes_dict = self.problem.build_mode_dict(solution.rcpsp_modes) for j in solution.rcpsp_schedule: start_time_j = solution.rcpsp_schedule[j]["start_time"] hinted_values[self.durations[j]] = self.problem.mode_details[j][ modes_dict[j] ]["duration"] for k in self.variable_per_task[j]: task, mode, time = k if start_time_j == time and mode == modes_dict[j]: hinted_values[self.x[k]] = 1 else: hinted_values[self.x[k]] = 0 return hinted_values
[docs] class GurobiMultimodeRcpspSolver(GurobiMilpSolver, _BaseLpMultimodeRcpspSolver): hyperparameters = _BaseLpMultimodeRcpspSolver.hyperparameters
[docs] def update_model(self) -> None: """Update model (especially for gurobi). It ensures the well defintion of variables. """ self.model.update()
[docs] def convert_to_variable_values( self, solution: RcpspSolution ) -> dict[gurobipy.Var, float]: """Convert a solution to a mapping between model variables and their values. Will be used by set_warm_start(). """ return _BaseLpMultimodeRcpspSolver.convert_to_variable_values(self, solution)
[docs] class MathOptMultimodeRcpspSolver( OrtoolsMathOptMilpSolver, _BaseLpMultimodeRcpspSolver ): hyperparameters = _BaseLpMultimodeRcpspSolver.hyperparameters max_horizon: Optional[int] = None partial_solution: Optional[PartialSolution] = None
[docs] def convert_to_variable_values( self, solution: RcpspSolution ) -> dict[mathopt.Variable, float]: """Convert a solution to a mapping between model variables and their values. Will be used by set_warm_start(). """ return _BaseLpMultimodeRcpspSolver.convert_to_variable_values(self, solution)
[docs] class CplexMultimodeRcpspSolver(CplexMilpSolver, _BaseLpMultimodeRcpspSolver): hyperparameters = _BaseLpMultimodeRcpspSolver.hyperparameters
[docs] def init_model(self, **args): greedy_start = args.get("greedy_start", True) start_solution = args.get("start_solution", None) max_horizon = args.get("max_horizon", None) if start_solution is None: if greedy_start: logger.info("Computing greedy solution") if self.problem.is_varying_resource(): greedy_solver = PileCalendarRcpspSolver(self.problem) else: greedy_solver = PileRcpspSolver(self.problem) store_solution = greedy_solver.solve( greedy_choice=GreedyChoice.MOST_SUCCESSORS ) self.start_solution = store_solution.get_best_solution_fit()[0] makespan = self.problem.evaluate(self.start_solution)["makespan"] else: logger.info("Get dummy solution") solution = self.problem.get_dummy_solution() self.start_solution = solution makespan = self.problem.evaluate(solution)["makespan"] else: self.start_solution = start_solution makespan = self.problem.evaluate(start_solution)["makespan"] sorted_tasks = self.problem.tasks_list renewable = { r: self.problem.resources[r] for r in self.problem.resources_list if r not in self.problem.non_renewable_resources } non_renewable = { r: self.problem.resources[r] for r in self.problem.non_renewable_resources } S = [] logger.debug(f"successors: {self.problem.successors}") for task in sorted_tasks: for suc in self.problem.successors[task]: S.append([task, suc]) if self.start_solution.rcpsp_schedule_feasible: self.index_time = list(range(int(makespan + 1))) if max_horizon is not None: self.index_time = list(range(max_horizon + 1)) self.model: "cplex.Model" = cplex.Model("MRCPSP") self.x: dict[tuple[Hashable, int, int], "cplex_var.Var"] = {} last_task = self.problem.sink_task variable_per_task = {} keys_for_t = {} for task in sorted_tasks: if task not in variable_per_task: variable_per_task[task] = [] for mode in self.problem.mode_details[task]: for t in self.index_time: self.x[(task, mode, t)] = self.model.binary_var( name=f"x({task},{mode}, {t})", ) for tt in range( t, t + self.problem.mode_details[task][mode]["duration"] ): if tt not in keys_for_t: keys_for_t[tt] = set() keys_for_t[tt].add((task, mode, t)) variable_per_task[task] += [(task, mode, t)] self.model.minimize( self.model.sum(self.x[key] * key[2] for key in variable_per_task[last_task]) ) # Only one (mode, t) switched on per task, which means the task starts at time=t with a given mode. self.model.add_constraints( ( self.model.sum(self.x[key] for key in variable_per_task[j]) == 1 for j in variable_per_task ) ) if self.problem.is_varying_resource(): renewable_quantity = {r: renewable[r] for r in renewable} else: renewable_quantity = { r: [renewable[r]] * len(self.index_time) for r in renewable } if self.problem.is_varying_resource(): non_renewable_quantity = {r: non_renewable[r] for r in non_renewable} else: non_renewable_quantity = { r: [non_renewable[r]] * len(self.index_time) for r in non_renewable } # Cumulative constraint for renewable resource. self.model.add_constraints( ( self.model.sum( int(self.problem.mode_details[key[0]][key[1]].get(r, 0)) * self.x[key] for key in keys_for_t[t] ) <= renewable_quantity[r][t] for (r, t) in product(renewable, self.index_time) ) ) self.model.add_constraints( ( self.model.sum( int(self.problem.mode_details[key[0]][key[1]][r]) * self.x[key] for key in self.x ) <= non_renewable_quantity[r][0] for r in non_renewable ) ) if self.problem.is_rcpsp_multimode(): durations = { j: self.model.integer_var( lb=min( [ self.problem.mode_details[j][m]["duration"] for m in self.problem.mode_details[j] ] ), ub=max( [ self.problem.mode_details[j][m]["duration"] for m in self.problem.mode_details[j] ] ), name="duration_" + str(j), ) for j in variable_per_task } self.model.add_constraints( ( self.model.sum( self.problem.mode_details[key[0]][key[1]]["duration"] * self.x[key] for key in variable_per_task[j] ) == durations[j] for j in variable_per_task ) ) else: durations = { j: self.problem.mode_details[j][1]["duration"] for j in variable_per_task } self.durations = durations self.variable_per_task = variable_per_task # Precedence constraints. self.model.add_constraints( ( self.model.sum( [key[2] * self.x[key] for key in variable_per_task[s]] + [-key[2] * self.x[key] for key in variable_per_task[j]] ) >= durations[j] for (j, s) in S ) ) self.starts = {} for task in sorted_tasks: self.starts[task] = self.model.integer_var( lb=0, ub=self.index_time[-1], name=f"start({task})" ) self.model.add_constraint( self.model.sum( [self.x[key] * key[2] for key in variable_per_task[task]] ) == self.starts[task] ) modes_dict = self.problem.build_mode_dict(self.start_solution.rcpsp_modes) if greedy_start: warmstart = self.model.new_solution() for j in self.start_solution.rcpsp_schedule: start_time_j = self.start_solution.rcpsp_schedule[j]["start_time"] for k in self.variable_per_task[j]: task, mode, time = k if start_time_j == time and mode == modes_dict[j]: warmstart.add_var_value(self.x[k], 1) warmstart.add_var_value(self.starts[task], time) else: warmstart.add_var_value(self.x[k], 0) self.model.add_mip_start(warmstart)