Source code for discrete_optimization.generic_tasks_tools.solvers.cpm

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
"""CPM to compute lower/uppe bound on start/end of tasks"""

import logging
from dataclasses import dataclass
from heapq import heapify, heappop, heappush
from typing import Any, Generic, Optional

import networkx as nx

from discrete_optimization.generic_tasks_tools.allocation import UnaryResource
from discrete_optimization.generic_tasks_tools.base import Task
from discrete_optimization.generic_tasks_tools.cumulative_resource import (
    CumulativeResource,
)
from discrete_optimization.generic_tasks_tools.enums import StartOrEnd
from discrete_optimization.generic_tasks_tools.generic_scheduling import (
    GenericSchedulingProblem,
)
from discrete_optimization.generic_tasks_tools.non_renewable_resource import (
    NonRenewableResource,
)

logger = logging.getLogger(__name__)


[docs] @dataclass class Taskbounds: start_lower_bound: Optional[int] = None end_lower_bound: Optional[int] = None start_upper_bound: Optional[int] = None end_upper_bound: Optional[int] = None
[docs] class Cpm(Generic[Task, UnaryResource, CumulativeResource, NonRenewableResource]): """Propagator of bounds trough precedence graph according to minimum duration of each task. For that we use: - multimode-scheduling: we know all possible durations - precedence: we use the precedence constraints We first do a forward pass on all tasks in precedence graph for lower bounds and then a backward pass for upper bounds. We start from lower bound 0 and upper bound horizon (problem horizon can be overriden). This differs from classic CPM as we know that other non-precedence constraints can occur so that we cannot assume end lower_bounds = end upper bound for sink tasks. So sink tasks end upper bound will be set to horizon instead. For each task, we take best of propagated bound and existing problem bound on start/end (via `problem.get_task_start_or_end_lower_bound()`). Attributes: horizon: if set, override `problem.get_makespan_upper_bound()`. """ def __init__( self, problem: GenericSchedulingProblem[ Task, UnaryResource, CumulativeResource, NonRenewableResource ], horizon: Optional[int] = None, ): self.problem = problem if horizon is None: self.horizon = self.problem.get_makespan_upper_bound() else: self.horizon = horizon self.graph_nx = self.problem.get_precedence_graph().to_networkx() self.map_node: dict[Any, Taskbounds] = { n: Taskbounds(None, None, None, None) for n in self.graph_nx.nodes() } self.node_to_index = { node: i_node for i_node, node in enumerate(self.graph_nx.nodes()) } self.index_to_node = { i_node: node for node, i_node in self.node_to_index.items() } self.immediate_successors = { node: set(nx.neighbors(self.graph_nx, node)) for node in self.graph_nx.nodes() } self.immediate_predecessors = { node: set(self.graph_nx.predecessors(node)) for node in self.graph_nx.nodes() } self.predecessors = { node: nx.algorithms.ancestors(self.graph_nx, node) for node in self.graph_nx.nodes() } self.successors = { node: nx.algorithms.descendants(self.graph_nx, node) for node in self.graph_nx.nodes() } self._computed = False
[docs] def compute_task_bounds(self) -> None: """Compute task bounds by forxard/backward propagation through precedence graph Returns: """ # Forward pass for lower bounds # starting nodes: without ancestors, start lower bound = 0 queue = [ (0, self.node_to_index[node]) for node, preds in self.predecessors.items() if len(preds) == 0 ] self._propagate_forward_bounds_through_graph(queue) # Check if horizon is sufficient for earliest finish date found last_tasks = [ node for node, succs in self.successors.items() if len(succs) == 0 ] # optimal makespan if there were no constraints apart from precedence self.makespan_cpm = max( self.map_node[task].end_lower_bound for task in last_tasks ) if self.makespan_cpm > self.horizon: raise RuntimeError( f"The schedule cannot be done with given horizon {self.horizon}, as it is below the computed earliest finish date {self.makespan_cpm}" ) # Backward pass for upper bounds # starting nodes: without descendants, end upper bound = horizon queue = [(-self.horizon, self.node_to_index[node]) for node in last_tasks] self._propagate_backward_bounds_through_graph(queue) self._computed = True
[docs] def get_task_bounds(self) -> dict[Task, tuple[int, int, int, int]]: """Return computed bounds on task start and end. Returns: start_lower_bound, end_lower_bound, start_upper_bound, end_upper_bound """ if not self._computed: self.compute_task_bounds() return { task: ( bounds.start_lower_bound, bounds.end_lower_bound, bounds.start_upper_bound, bounds.end_upper_bound, ) for task, bounds in self.map_node.items() }
[docs] def get_a_critical_path(self) -> list[Task]: """Compute a critical path. It takes into account no other constraints that precedence constraints. It starts from a task without predecessor to a task without successor. Returns: """ if not self._computed: self.compute_task_bounds() last_tasks = [ node for node, succs in self.successors.items() if len(succs) == 0 ] critical_path = [] preds = [] for node in last_tasks: if self.map_node[node].end_lower_bound == self.makespan_cpm: # critical last task critical_path.append(node) preds = self.immediate_predecessors[node] cur_node = node break while len(preds) > 0: new_preds = [] for node in preds: if ( self.map_node[node].start_lower_bound == self.map_node[node].start_upper_bound - self.horizon + self.makespan_cpm and self.map_node[node].end_lower_bound == self.map_node[cur_node].start_lower_bound ): critical_path.append(node) new_preds = self.immediate_predecessors[node] cur_node = node break preds = new_preds return critical_path[::-1]
[docs] def get_critical_subgraph(self) -> nx.DiGraph: """Compute critical subgraph. It includes all critical nodes. Returns: a subgraph *view* of precedence graph `self.graph_nx` (so beware that attributes are shared with original graph) """ if not self._computed: self.compute_task_bounds() last_tasks = [ node for node, succs in self.successors.items() if len(succs) == 0 ] critical_nodes = set() critical_nodes_to_review = [] # Init: critical nodes among last tasks for node in last_tasks: if self.map_node[node].end_lower_bound == self.makespan_cpm: # critical last task critical_nodes_to_review.append(node) # Go backward from there while len(critical_nodes_to_review) > 0: critical_node = critical_nodes_to_review.pop() critical_nodes.add(critical_node) for node in self.immediate_predecessors[critical_node]: if ( self.map_node[node].start_lower_bound == self.map_node[node].start_upper_bound - self.horizon + self.makespan_cpm and self.map_node[node].end_lower_bound == self.map_node[critical_node].start_lower_bound ): critical_nodes_to_review.append(node) return self.graph_nx.subgraph(critical_nodes)
def _propagate_forward_bounds_through_graph(self, queue): done = set() heapify(queue) while queue: time, i_node = heappop(queue) node = self.index_to_node[i_node] if node in done: # node already seen continue # update time to take into account problem bound time = max( time, self.problem.get_task_start_or_end_lower_bound( task=node, start_or_end=StartOrEnd.START ), ) self.map_node[node].start_lower_bound = time min_duration = min( self.problem.get_task_mode_duration(task=node, mode=mode) for mode in self.problem.get_task_modes(task=node) ) # end lower bound : best of start lower bound + min duration and problem bound self.map_node[node].end_lower_bound = max( time + min_duration, self.problem.get_task_start_or_end_lower_bound( task=node, start_or_end=StartOrEnd.END ), ) done.add(node) next_nodes = self.immediate_successors[node] for next_node in next_nodes: if all(node in done for node in self.immediate_predecessors[next_node]): next_node_start_lower_bound = max( self.map_node[node].end_lower_bound for node in self.immediate_predecessors[next_node] ) heappush( queue, (next_node_start_lower_bound, self.node_to_index[next_node]), ) def _propagate_backward_bounds_through_graph(self, queue): done = set() heapify(queue) while queue: time, i_node = heappop(queue) node = self.index_to_node[i_node] if node in done: # node already seen continue # update time to take into account problem bound eub = min( -time, self.problem.get_task_start_or_end_upper_bound( task=node, start_or_end=StartOrEnd.END ), ) self.map_node[node].end_upper_bound = eub min_duration = min( self.problem.get_task_mode_duration(task=node, mode=mode) for mode in self.problem.get_task_modes(task=node) ) # start upper bound: best of end upper bound - min_duration and problem bound self.map_node[node].start_upper_bound = min( eub - min_duration, self.problem.get_task_start_or_end_upper_bound( task=node, start_or_end=StartOrEnd.START ), ) done.add(node) next_nodes = self.immediate_predecessors[node] for next_node in next_nodes: if all(node in done for node in self.immediate_successors[next_node]): next_node_end_upper_bound = min( self.map_node[n].start_upper_bound for n in self.immediate_successors[next_node] ) heappush( queue, (-next_node_end_upper_bound, self.node_to_index[next_node]), )