# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
import logging
from collections.abc import Callable
from typing import Any, Optional, Union
import networkx as nx
from discrete_optimization.generic_tools.do_problem import (
ModeOptim,
ParamsObjectiveFunction,
Solution,
)
from discrete_optimization.generic_tools.lp_tools import GurobiMilpSolver, MilpSolver
from discrete_optimization.rcpsp.problem import RcpspProblem
from discrete_optimization.rcpsp.solution import RcpspSolution
from discrete_optimization.rcpsp.solvers import RcpspSolver
try:
import gurobipy
except ImportError:
gurobi_available = False
else:
gurobi_available = True
import gurobipy as gurobi
logger = logging.getLogger(__name__)
[docs]
def intersect(i1, i2):
if i2[0] >= i1[1] or i1[0] >= i2[1]:
return None
else:
s = max(i1[0], i2[0])
e = min(i1[1], i2[1])
return [s, e]
[docs]
class ConstraintTaskIndividual:
list_tuple: list[tuple[str, int, int, bool]]
# task, resource, resource_individual, has or has not to do a task
# indicates constraint for a given resource individual that has to do a task
def __init__(self, list_tuple):
self.list_tuple = list_tuple
[docs]
class ConstraintWorkDuration:
ressource: str
individual: int
time_bounds: tuple[int, int]
working_time_upper_bound: int
def __init__(self, ressource, individual, time_bounds, working_time_upper_bound):
self.ressource = ressource
self.individual = individual
self.time_bounds = time_bounds
self.working_time_upper_bound = working_time_upper_bound
class _BaseLpGanttMultimodeRcpspSolver(MilpSolver, RcpspSolver):
problem: RcpspProblem
def __init__(
self,
problem: RcpspProblem,
rcpsp_solution: RcpspSolution,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
**kwargs,
):
if problem.calendar_details is None:
raise ValueError(
"rcpsp_problem.calendar_details cannot be None for this solver"
)
super().__init__(
problem=problem, params_objective_function=params_objective_function
)
self.rcpsp_solution = rcpsp_solution
self.jobs = sorted(list(problem.mode_details.keys()))
self.modes_dict = {
i + 2: self.rcpsp_solution.rcpsp_modes[i]
for i in range(len(self.rcpsp_solution.rcpsp_modes))
}
self.modes_dict[1] = 1
self.modes_dict[self.jobs[-1]] = 1
self.rcpsp_schedule = self.rcpsp_solution.rcpsp_schedule
self.start_times_dict = {}
for task in self.rcpsp_schedule:
t = self.rcpsp_schedule[task]["start_time"]
if t not in self.start_times_dict:
self.start_times_dict[t] = set()
self.start_times_dict[t].add((task, t))
self.graph_intersection_time = nx.Graph()
for t in self.jobs:
self.graph_intersection_time.add_node(t)
for t in self.jobs:
intersected_jobs = [
task
for task in self.rcpsp_schedule
if intersect(
[
self.rcpsp_schedule[task]["start_time"],
self.rcpsp_schedule[task]["end_time"],
],
[
self.rcpsp_schedule[t]["start_time"],
self.rcpsp_schedule[t]["end_time"],
],
)
is not None
and t != task
]
for tt in intersected_jobs:
self.graph_intersection_time.add_edge(t, tt)
cliques = [c for c in nx.find_cliques(self.graph_intersection_time)]
self.cliques = cliques
self.sense_optim = ModeOptim.MINIMIZATION
self.params_objective_function.sense_function = self.sense_optim
self.constraint_additionnal = {}
def retrieve_current_solution(
self,
get_var_value_for_current_solution: Callable[[Any], float],
get_obj_value_for_current_solution: Callable[[], float],
) -> tuple[dict[Any, dict[Any, dict[Any, Any]]], float]:
objective = get_obj_value_for_current_solution()
resource_id_usage = {
k: {
individual: {
task: get_var_value_for_current_solution(resource_usage)
for task, resource_usage in self.ressource_id_usage[k][
individual
].items()
}
for individual in self.ressource_id_usage[k]
}
for k in self.ressource_id_usage
}
return (resource_id_usage, objective)
# gurobi solver which is usefull to get a pool of solution (indeed, using the other one we dont have usually a lot of
# solution since we converge rapidly to the "optimum" (we don't have an objective value..)
[docs]
class GurobiGanttMultimodeRcpspSolver(
GurobiMilpSolver, _BaseLpGanttMultimodeRcpspSolver
):
[docs]
def convert_to_variable_values(
self, solution: Solution
) -> dict[gurobipy.Var, float]:
raise NotImplementedError(
f"No warmstart implemented for {self.__class__.__name__}."
)
[docs]
def init_model(self, **args):
self.model = gurobi.Model("Gantt")
self.ressource_id_usage = {
k: {i: {} for i in range(len(self.problem.calendar_details[k]))}
for k in self.problem.calendar_details.keys()
}
variables_per_task = {}
variables_per_individual = {}
constraints_ressource_need = {}
for task in self.jobs:
start = self.rcpsp_schedule[task]["start_time"]
end = self.rcpsp_schedule[task]["end_time"]
for k in self.ressource_id_usage: # typically worker
needed_ressource = (
self.problem.mode_details[task][self.modes_dict[task]][k] > 0
)
if needed_ressource:
for individual in self.ressource_id_usage[k]:
available = all(
self.problem.calendar_details[k][individual][time]
for time in range(start, end)
)
if available:
key_variable = (k, individual, task)
self.ressource_id_usage[k][individual][
task
] = self.model.addVar(
name=str(key_variable), vtype=gurobi.GRB.BINARY
)
if task not in variables_per_task:
variables_per_task[task] = set()
if k not in variables_per_individual:
variables_per_individual[k] = {}
if individual not in variables_per_individual[k]:
variables_per_individual[k][individual] = set()
variables_per_task[task].add(key_variable)
variables_per_individual[k][individual].add(key_variable)
ressource_needed = self.problem.mode_details[task][
self.modes_dict[task]
][k]
if k not in constraints_ressource_need:
constraints_ressource_need[k] = {}
constraints_ressource_need[k][task] = self.model.addLConstr(
gurobi.quicksum(
[
self.ressource_id_usage[k][key[1]][key[2]]
for key in variables_per_task[task]
if key[0] == k
]
)
== ressource_needed,
name="ressource_" + str(k) + "_" + str(task),
)
overlaps_constraints = {}
for i in range(len(self.cliques)):
tasks = set(self.cliques[i])
for k in variables_per_individual:
for individual in variables_per_individual[k]:
keys_variable = [
variable
for variable in variables_per_individual[k][individual]
if variable[2] in tasks
]
if len(keys_variable) > 0:
overlaps_constraints[
(i, k, individual)
] = self.model.addLConstr(
gurobi.quicksum(
[
self.ressource_id_usage[key[0]][key[1]][key[2]]
for key in keys_variable
]
)
<= 1
)
self.model.modelSense = gurobi.GRB.MINIMIZE
[docs]
def adding_constraint(
self,
constraint_description: Union[ConstraintTaskIndividual, ConstraintWorkDuration],
constraint_name: str = "",
):
if isinstance(constraint_description, ConstraintTaskIndividual):
if constraint_name == "":
constraint_name = str(ConstraintTaskIndividual.__name__)
for tupl in constraint_description.list_tuple:
ressource, ressource_individual, task, has_to_do = tupl
if ressource in self.ressource_id_usage:
if ressource_individual in self.ressource_id_usage[ressource]:
if (
task
in self.ressource_id_usage[ressource][ressource_individual]
):
if constraint_name not in self.constraint_additionnal:
self.constraint_additionnal[constraint_name] = []
self.constraint_additionnal[constraint_name] += [
self.model.addLConstr(
self.ressource_id_usage[ressource][
ressource_individual
][task]
== has_to_do
)
]
if isinstance(constraint_description, ConstraintWorkDuration):
if constraint_name == "":
constraint_name = str(ConstraintWorkDuration.__name__)
if constraint_name not in self.constraint_additionnal:
self.constraint_additionnal[constraint_name] = []
tasks_of_interest = [
t
for t in self.rcpsp_schedule
if t
in self.ressource_id_usage.get(
constraint_description.ressource, {}
).get(constraint_description.individual, {})
and (
constraint_description.time_bounds[0]
<= self.rcpsp_schedule[t]["start_time"]
<= constraint_description.time_bounds[1]
or constraint_description.time_bounds[0]
<= self.rcpsp_schedule[t]["end_time"]
<= constraint_description.time_bounds[1]
)
]
logger.debug(tasks_of_interest)
self.constraint_additionnal[constraint_name] += [
self.model.addLConstr(
gurobi.quicksum(
[
self.ressource_id_usage[constraint_description.ressource][
constraint_description.individual
][t]
* (
min(
constraint_description.time_bounds[1],
self.rcpsp_schedule[t]["end_time"],
)
- max(
constraint_description.time_bounds[0],
self.rcpsp_schedule[t]["start_time"],
)
)
for t in tasks_of_interest
]
)
<= constraint_description.working_time_upper_bound
)
]
self.model.update()
[docs]
def retrieve_current_solution(
self,
get_var_value_for_current_solution: Callable[[Any], float],
get_obj_value_for_current_solution: Callable[[], float],
) -> tuple[dict[Any, dict[Any, dict[Any, Any]]], float]:
objective = get_obj_value_for_current_solution()
resource_id_usage = {
k: {
individual: {
task: get_var_value_for_current_solution(resource_usage)
for task, resource_usage in self.ressource_id_usage[k][
individual
].items()
}
for individual in self.ressource_id_usage[k]
}
for k in self.ressource_id_usage
}
return (resource_id_usage, objective)
[docs]
def build_objective_function_from_a_solution(
self,
ressource_usage: dict[str, dict[int, dict[int, bool]]],
ignore_tuple: set[tuple[str, int, int]] = None,
):
objective = gurobi.LinExpr(0.0)
if ignore_tuple is None:
ignore_tuple = set()
for k in ressource_usage:
for individual in ressource_usage[k]:
for task in ressource_usage[k][individual]:
if ressource_usage[k][individual][task] >= 0.5:
objective.add(1 - self.ressource_id_usage[k][individual][task])
logger.debug("Setting new objectives = Change task objective")
self.model.setObjective(objective)