# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from typing import Union
import networkx as nx
from networkx import NetworkXNoCycle
from discrete_optimization.rcpsp.problem import RcpspProblem
from discrete_optimization.rcpsp.problem_preemptive import PreemptiveRcpspProblem
from discrete_optimization.rcpsp.problem_specialized_constraints import (
SpecialConstraintsPreemptiveRcpspProblem,
)
from discrete_optimization.rcpsp.special_constraints import (
SpecialConstraintsDescription,
)
from discrete_optimization.rcpsp_multiskill.problem import (
MultiskillRcpspProblem,
VariantMultiskillRcpspProblem,
)
[docs]
class GraphRcpsp:
def __init__(
self,
problem: Union[
RcpspProblem,
PreemptiveRcpspProblem,
MultiskillRcpspProblem,
VariantMultiskillRcpspProblem,
],
):
self.problem = problem
self.graph = self.problem.compute_graph()
self.graph_nx = self.graph.graph_nx
self.ancestors_map = {}
self.descendants_map = {}
descendants = {
n: nx.algorithms.descendants(self.graph_nx, n)
for n in self.graph_nx.nodes()
}
self.source = self.problem.source_task
self.sink = self.problem.sink_task
self.all_activities = set(self.problem.tasks_list)
for k in descendants:
self.descendants_map[k] = {
"succs": descendants[k],
"nb": len(descendants[k]),
}
ancestors = {
n: nx.algorithms.ancestors(self.graph_nx, n) for n in self.graph_nx.nodes()
}
for k in ancestors:
self.ancestors_map[k] = {"succs": ancestors[k], "nb": len(ancestors[k])}
self.graph_without_source_sink = nx.subgraph(
self.graph_nx,
[
n
for n in self.graph_nx
if n not in {self.problem.sink_task, self.problem.source_task}
],
)
[docs]
def get_next_activities(self, task):
return self.graph.get_neighbors(task)
[docs]
def get_pred_activities(self, task):
return self.graph.get_predecessors(task)
[docs]
def get_descendants_activities(self, task):
return self.descendants_map.get(task, {"succs": set()})["succs"]
[docs]
def get_ancestors_activities(self, task):
return self.ancestors_map.get(task, {"succs": set()})["succs"]
[docs]
def check_loop(self):
try:
cycles = nx.find_cycle(self.graph_nx, orientation="original")
except NetworkXNoCycle:
cycles = None
return cycles
[docs]
def compute_component_in_non_dummy_graph(self):
return [
c
for c in sorted(
nx.weakly_connected_components(self.graph_without_source_sink),
key=len,
reverse=True,
)
]
[docs]
class GraphSpecialConstraintsRcpsp(GraphRcpsp):
def __init__(
self,
problem: Union[RcpspProblem, SpecialConstraintsPreemptiveRcpspProblem],
):
if isinstance(problem, RcpspProblem) and not problem.do_special_constraints:
raise ValueError("this graph is meant for models with special constraints")
super().__init__(problem)
self.special_constraints: SpecialConstraintsDescription = (
problem.special_constraints
)
self.graph_constraints = nx.DiGraph()
for K in (
self.special_constraints.start_together
+ self.special_constraints.start_at_end
+ self.special_constraints.start_at_end_plus_offset
+ self.special_constraints.start_after_nunit
+ self.special_constraints.disjunctive_tasks
):
t1 = K[0]
t2 = K[1]
if t1 not in self.graph_constraints:
self.graph_constraints.add_node(t1)
if t2 not in self.graph_constraints:
self.graph_constraints.add_node(t2)
self.graph_constraints.add_edge(t1, t2)
self.graph_constraints.add_edge(t2, t1)
self.components_graph_constraints = [
c for c in nx.strongly_connected_components(self.graph_constraints)
]
self.index_components = {}
for i in range(len(self.components_graph_constraints)):
for ci in self.components_graph_constraints[i]:
self.index_components[ci] = i
[docs]
def get_neighbors_constraints(self, task):
if task in self.graph_constraints:
return list(self.graph_constraints.neighbors(task))
return []
[docs]
def get_pred_constraints(self, task):
if task in self.graph_constraints:
return list(self.graph_constraints.predecessors(task))
return []
[docs]
def build_graph_rcpsp_object(
rcpsp_problem: Union[RcpspProblem, PreemptiveRcpspProblem]
):
if (
hasattr(rcpsp_problem, "do_special_constraints")
and rcpsp_problem.do_special_constraints
):
return GraphSpecialConstraintsRcpsp(problem=rcpsp_problem)
else:
return GraphRcpsp(problem=rcpsp_problem)