Source code for discrete_optimization.rcpsp.problem

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
from collections.abc import Callable, Hashable, Iterable
from copy import deepcopy
from enum import Enum
from functools import cache, partial
from typing import Any, Optional, Union

import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt

from discrete_optimization.generic_rcpsp_tools.attribute_type import (
    ListIntegerRcpsp,
    PermutationRcpsp,
)
from discrete_optimization.generic_tasks_tools.allocation import (
    NoUnaryResource,
    WithoutAllocationProblem,
)
from discrete_optimization.generic_tasks_tools.calendar_resource import (
    convert_calendar_to_availability_intervals,
)
from discrete_optimization.generic_tasks_tools.enums import StartOrEnd
from discrete_optimization.generic_tasks_tools.generic_scheduling import (
    GenericSchedulingProblem,
)
from discrete_optimization.generic_tools.do_problem import (
    ModeOptim,
    ObjectiveDoc,
    ObjectiveHandling,
    ObjectiveRegister,
    Solution,
    TupleFitness,
    TypeObjective,
)
from discrete_optimization.generic_tools.encoding_register import EncodingRegister
from discrete_optimization.generic_tools.graph_api import Graph
from discrete_optimization.rcpsp.fast_function import (
    compute_mean_ressource,
    sgs_fast,
    sgs_fast_partial_schedule_incomplete_permutation_tasks,
)
from discrete_optimization.rcpsp.solution import (
    CumulativeResource,
    NonRenewableResource,
    RcpspSolution,
    Resource,
    Task,
)
from discrete_optimization.rcpsp.special_constraints import (
    PairModeConstraint,
    SpecialConstraintsDescription,
)
from discrete_optimization.rcpsp.utils import (
    get_end_bounds_from_additional_constraint,
    get_start_bounds_from_additional_constraint,
    intersect,
)

logger = logging.getLogger(__name__)


[docs] class ScheduleGenerationScheme(Enum): SERIAL_SGS = 0 PARALLEL_SGS = 1
[docs] class RcpspProblem( GenericSchedulingProblem[ Task, NoUnaryResource, CumulativeResource, NonRenewableResource ], WithoutAllocationProblem[Task], ): """Main class for RCPSP problem. Attributes: resources (dict[str, Union[int, list[int]): mapping: name -> number of resources available, either at each time (if an integer), or time step by time step (if a list of integers). For non-renewable resources, should be an integer or a constant list (i.e. with same integer repeated) non_renewable_resources (list[str]): list of resources (specified by name) that are non-renewable mode_details (dict[Hashable, dict[int, dict[str, int]]]): task -> (mode number -> resource needs and duration) Ex: >>> mode_details = { "task1" : ... { # task1 details ... 1: { "duration": 2, "R1": 1, "R2": 0, "R3": 4 }, # task1, mode 1: duration = 2, resource needs: R1=1, R3=4 ... 2: { "duration": 10, "R1": 0, "R2": 2, "R3": 0 }, # task1, mode 1: duration = 10, resource needs: R2=2 ... }, ... # [other tasks] ... } successors (dict[Hashable, list[Hashable]]): successors in the precedence graph of each task horizon (int): max number of time steps allowed tasks_list(list[Hashable]): list of tasks. Must correspond to `mode_details`. If not given will be equal to `list(mode_details)`. source_task (Hashable): the id for the source task (i.e. the root in the precedence graph, all other tasks must follow it). If not given and tasks_list is made of integers, the lowest one sink_task (Hashable): the id for the source task (i.e. the last task in the precedence graph, follows all other tasks). If not given and tasks_list is made of integers, the highest one. tasks_list_non_dummy (list[Hashable]): tasks list excluding source and sink. Same order as tasks_list name_task (dict[Hashable, str]): mapping task ids to task names. Default to id -> str(id) n_jobs (int): numer of tasks (i.e. `len(tasks_list)`) n_jobs_non_dummy (int): number of tasks excluding dummy activities Start (0) and End (n). (i.e. `n_jobs - 2`) index_task (dict[Hashable, int]): mapping task id to index among all tasks (from 0 to n_jobs) index_task_non_dummy (dict[Hashable, int]): mapping task id to index among all non-dummy tasks (from 0 to n_jobs_non_dummy) special_constraints (SpecialConstraintsDescription): special additional constraints do_special_constraints (bool): True if they are special constraints to take into account relax_the_start_at_end (bool): relax some conditions, only if do_special_constraints. fixed_permutation (Optional[list[int]]): To be used by solutions specifying only `rcpsp_modes`, e.g. in alternating genetic algorithms, that are updating only one solution attribute at a time. See `RcpspSolution` for more details. fixed_modes (Optional[list[int]]): To be used by solutions specifying only the `rpsp_permutation`, e.g. in alternating genetic algorithms, that are updating only one solution attribute at a time. See `RcpspSolution` for more details. Example: Initializing a simple RCPSP problem with 4 tasks, 2 resources, and precedence constraints, then generating a baseline schedule. >>> from discrete_optimization.rcpsp.problem import RcpspProblem >>> # Define resources >>> resources = {"R1": 5, "R2": 2} >>> non_renewable_resources = [] # Empty if all resources replenish when a task ends >>> # Define mode details >>> # Format: { task_id: { mode_id: { "duration": int, "resource_name": amount } } } >>> mode_details = { ... "source": {1: {"duration": 0, "R1": 0, "R2": 0}}, # Source (Dummy) ... "task-1": {1: {"duration": 4, "R1": 2, "R2": 2}}, ... "task-2": {1: {"duration": 3, "R1": 3, "R2": 0}}, ... "task-3": {1: {"duration": 5, "R1": 2, "R2": 1}}, ... "task-4": {1: {"duration": 2, "R1": 1, "R2": 1}}, ... "sink": {1: {"duration": 0, "R1": 0, "R2": 0}}, # Sink (Dummy) ... } >>> # Define precedence graph >>> successors = { ... "source": ["task-1", "task-2"], # First tasks: 1 and 2 ... "task-1": ["task-3"], # Task 1 must finish before 3 ... "task-2": ["task-4"], # Task 2 must finish before 4 ... "task-3": ["sink"], # Task 3 leads to Sink ... "task-4": ["sink"], # Task 4 leads to Sink ... "sink": [], # Sink has no successors ... } >>> # Initialize the Problem >>> problem = RcpspProblem( ... resources=resources, ... non_renewable_resources=non_renewable_resources, ... mode_details=mode_details, ... successors=successors, ... horizon=20, # Maximum time allowed for the project ... source_task="source", ... sink_task="sink" ... ) >>> # Get a dummy solution: use serial sgs on trivial permutation >>> # See RcpspSolution docstring for more information. >>> sol = problem.get_dummy_solution() >>> print(sol.rcpsp_schedule) {'source': {'start_time': 0, 'end_time': 0}, 'task-1': {'start_time': 0, 'end_time': 4}, 'task-2': {'start_time': 0, 'end_time': 3}, 'task-3': {'start_time': 4, 'end_time': 9}, 'task-4': {'start_time': 4, 'end_time': 6}, 'sink': {'start_time': 9, 'end_time': 9}} >>> # index_task vs index_task_non_dummy >>> problem.index_task # index among all tasks: task-1 -> 1 {'source': 0, 'task-1': 1, 'task-2': 2, 'task-3': 3, 'task-4': 4, 'sink': 5} >>> problem.index_task_non_dummy # index among only non-dummy tasks: task-1 -> 0 {'task-1': 0, 'task-2': 1, 'task-3': 2, 'task-4': 3} >>> # Convert permutation of tasks into a list opf non-dummy indices >>> problem.convert_task_ids_to_permutation(["task-3", "task-2", "task-4", "task-1"]) [2, 1, 3, 0] """ sgs: ScheduleGenerationScheme def __init__( self, resources: dict[str, Union[int, list[int]]], non_renewable_resources: list[str], mode_details: dict[Hashable, dict[int, dict[str, int]]], successors: dict[Hashable, list[Hashable]], horizon: int, tasks_list: Optional[list[Hashable]] = None, source_task: Optional[Hashable] = None, sink_task: Optional[Hashable] = None, name_task: Optional[dict[Hashable, str]] = None, calendar_details: Optional[dict[str, list[list[int]]]] = None, special_constraints: Optional[SpecialConstraintsDescription] = None, relax_the_start_at_end: bool = True, fixed_permutation: Optional[list[int]] = None, fixed_modes: Optional[list[int]] = None, **kwargs: Any, ): """ Args: resources: non_renewable_resources: mode_details: successors: horizon: tasks_list: source_task: sink_task: name_task: calendar_details: special_constraints: relax_the_start_at_end: fixed_permutation: fixed_modes: **kwargs: """ self.resources = resources self.resources_list = list(self.resources.keys()) self.non_renewable_resources = non_renewable_resources self.mode_details = mode_details self.successors = successors self.horizon = horizon self.calendar_details = calendar_details if name_task is None: self.name_task = {x: str(x) for x in self.mode_details} else: self.name_task = name_task if tasks_list is None: self._tasks_list = list(self.mode_details) else: assert set(tasks_list) == set(self.mode_details), ( "tasks_list must contain same ids as mode_details" ) self._tasks_list = tasks_list self.n_jobs = len(self.mode_details.keys()) self.n_jobs_non_dummy = self.n_jobs - 2 self.index_task = {self.tasks_list[i]: i for i in range(self.n_jobs)} if source_task is None: if all((isinstance(t, int) for t in self.tasks_list)): self.source_task = min(self.tasks_list) # type: ignore else: raise ValueError( "source_task cannot be None if tasks id given in tasks_list are not all integers." ) else: self.source_task = source_task if sink_task is None: if all((isinstance(t, int) for t in self.tasks_list)): self.sink_task = max(self.tasks_list) # type: ignore else: raise ValueError( "sink_task cannot be None if tasks id given in tasks_list are not all integers." ) else: self.sink_task = sink_task self.tasks_list_non_dummy = [ t for t in self.tasks_list if t not in {self.source_task, self.sink_task} ] self.index_task_non_dummy = { self.tasks_list_non_dummy[i]: i for i in range(self.n_jobs_non_dummy) } self.costs: dict[str, bool] = { "makespan": True, "mean_resource_reserve": kwargs.get("mean_resource_reserve", False), } self.relax_the_start_at_end = relax_the_start_at_end self.fixed_permutation = fixed_permutation self.fixed_modes = fixed_modes if special_constraints is None: self.special_constraints = SpecialConstraintsDescription( skip_special_constraints=True ) else: self.special_constraints = special_constraints self.update_problem()
[docs] def update_problem(self) -> None: """Method to call when some attributes have been modified. It recomputes what is needed (numba functions, calendars, ...) It take into account modifications of: - horizon - resource calendars - duration/resource consumption for a given task, mode - special constraints - successors NB: special constraints may add precedence constraints. A new call to update_problem() with different special constraints will not roll back the updated precedence constraints. """ self.update_calendars() self.update_special_constraints() self.update_functions()
[docs] def update_special_constraints(self): if self.special_constraints.skip_special_constraints: self.do_special_constraints = False else: self.do_special_constraints = True predecessors_dict: dict[Hashable, list[Hashable]] = { task: [] for task in self.successors } for task in self.successors: for stask in self.successors[task]: predecessors_dict[stask] += [task] for t1, t2 in self.special_constraints.start_at_end: if t2 not in self.successors[t1]: self.successors[t1].append(t2) for t1, t2, off in self.special_constraints.start_at_end_plus_offset: if t2 not in self.successors[t1]: self.successors[t1].append(t2) for t1, t2 in self.special_constraints.start_together: for predt1 in predecessors_dict[t1]: if t2 not in self.successors[predt1]: self.successors[predt1] += [t2] for predt2 in predecessors_dict[t2]: if t1 not in self.successors[predt2]: self.successors[predt2] += [t1] self.graph = self.compute_graph( compute_predecessors=self.do_special_constraints ) if self.do_special_constraints: self.predecessors = self.graph.predecessors_dict
[docs] def update_calendars(self): self.is_calendar = False if any(isinstance(self.resources[res], Iterable) for res in self.resources): self.is_calendar = ( max( ( len( {self.resources[res]} if np.isscalar(self.resources[res]) else set(self.resources[res]) # type: ignore ) for res in self.resources ) ) > 1 ) if self.is_calendar: # consolidate calendars: same length = horizon, missing values = 0 self.resources = { r: [int(calendar)] * self.horizon if np.isscalar(calendar) else list(calendar)[: self.horizon] + [0] * (self.horizon - len(calendar)) for r, calendar in self.resources.items() } else: # keep only integers self.resources = { r: int(calendar) if np.isscalar(calendar) else int(calendar[0]) for r, calendar in self.resources.items() } self.update_resource_availabilities()
[docs] def update_resource_availabilities(self) -> None: super().update_resource_availabilities() self.get_resource_availabilities.cache_clear()
@property def tasks_list(self) -> list[Task]: return self._tasks_list @property def non_renewable_resources_list(self) -> list[NonRenewableResource]: return self.non_renewable_resources @property def cumulative_resources_list(self) -> list[CumulativeResource]: return [ resource for resource in self.resources_list if resource not in self.non_renewable_resources ]
[docs] def get_non_renewable_resource_capacity( self, resource: NonRenewableResource ) -> int: capacity = self.resources[resource] if np.isscalar(capacity): return capacity else: return capacity[0]
[docs] def get_non_renewable_resource_consumption( self, resource: NonRenewableResource, task: Task, mode: int ) -> int: mode_detail = self.mode_details[task][mode] return mode_detail.get(resource, 0)
[docs] @cache def get_resource_availabilities( self, resource: Resource ) -> list[tuple[int, int, int]]: return convert_calendar_to_availability_intervals( calendar=self.resources[resource], horizon=self.horizon )
[docs] def get_cumulative_resource_consumption( self, resource: Resource, task: Task, mode: int ) -> int: if not self.is_cumulative_resource(resource): raise ValueError(f"{resource} is not a cumulative resource of the problem.") else: mode_detail = self.mode_details[task][mode] return mode_detail.get(resource, 0)
[docs] def get_task_mode_duration(self, task: Task, mode: int) -> int: return self.mode_details[task][mode]["duration"]
[docs] def get_task_modes(self, task: Task) -> set[int]: return set(self.mode_details[task])
[docs] def get_last_tasks(self) -> list[Task]: return [self.sink_task]
[docs] def get_precedence_constraints(self) -> dict[Task, Iterable[Task]]: return self.successors
[docs] def get_makespan_upper_bound(self) -> int: return self.horizon
[docs] def get_task_start_or_end_lower_bound( self, task: Task, start_or_end: StartOrEnd ) -> int: match start_or_end: case StartOrEnd.START: lb, ub = get_start_bounds_from_additional_constraint( rcpsp_problem=self, activity=task ) case _: lb, ub = get_end_bounds_from_additional_constraint( rcpsp_problem=self, activity=task ) return lb
[docs] def get_task_start_or_end_upper_bound( self, task: Task, start_or_end: StartOrEnd ) -> int: match start_or_end: case StartOrEnd.START: lb, ub = get_start_bounds_from_additional_constraint( rcpsp_problem=self, activity=task ) case _: lb, ub = get_end_bounds_from_additional_constraint( rcpsp_problem=self, activity=task ) return ub
[docs] def update_functions(self) -> None: ( self.func_sgs, self.func_sgs_2, self.compute_mean_resource, ) = create_np_data_and_jit_functions(rcpsp_problem=self)
[docs] def is_rcpsp_multimode(self) -> bool: return self.is_multimode
[docs] def is_varying_resource(self) -> bool: return self.is_calendar
[docs] def is_preemptive(self) -> bool: return False
[docs] def is_multiskill(self) -> bool: return False
[docs] def includes_special_constraint(self) -> bool: return self.do_special_constraints
[docs] def has_time_lag_constraints(self) -> bool: """Check if problem has minimum or maximum time lag constraints. Returns True if the problem includes generalized precedence constraints (RCPSP/max), which require specialized scheduling algorithms like iterative SGS with unblocking filters. """ if not self.do_special_constraints: return False return ( len(self.special_constraints.start_to_start_min_time_lag) > 0 or len(self.special_constraints.start_to_start_max_time_lag) > 0 )
[docs] def get_resource_names(self) -> list[str]: return self.resources_list
[docs] def get_resource_availability_array(self, res: str) -> list[int]: if self.is_varying_resource() and not np.isscalar(self.resources[res]): return self.resources[res] # type: ignore else: return self.horizon * [int(self.resources[res])]
[docs] def compute_graph(self, compute_predecessors: bool = False) -> Graph: nodes: list[tuple[Hashable, dict[str, Any]]] = [ ( n, { str(mode): self.mode_details[n][mode]["duration"] for mode in self.mode_details[n] }, ) for n in self.tasks_list ] edges: list[tuple[Hashable, Hashable, dict[str, Any]]] = [] for n in self.successors: for succ in self.successors[n]: edges += [(n, succ, {})] return Graph( nodes, edges, compute_predecessors=compute_predecessors, undirected=False )
[docs] def evaluate_function(self, rcpsp_sol: RcpspSolution) -> tuple[int, float, int]: if rcpsp_sol._schedule_to_recompute: rcpsp_sol.generate_schedule_from_permutation_serial_sgs() makespan = rcpsp_sol.rcpsp_schedule[self.sink_task]["end_time"] if self.costs["mean_resource_reserve"]: obj_mean_resource_reserve = rcpsp_sol.compute_mean_resource_reserve() else: obj_mean_resource_reserve = 0.0 if self.do_special_constraints: penalty = evaluate_constraints( solution=rcpsp_sol, constraints=self.special_constraints ) else: penalty = 0 return makespan, obj_mean_resource_reserve, penalty
[docs] def evaluate(self, variable: RcpspSolution) -> dict[str, float]: # type: ignore obj_makespan, obj_mean_resource_reserve, penalty = self.evaluate_function( variable ) return { "makespan": float(obj_makespan), "mean_resource_reserve": obj_mean_resource_reserve, "constraint_penalty": float(penalty), }
[docs] def evaluate_mobj(self, variable: RcpspSolution) -> TupleFitness: # type: ignore return self.evaluate_mobj_from_dict(self.evaluate(variable))
[docs] def evaluate_mobj_from_dict(self, dict_values: dict[str, float]) -> TupleFitness: return TupleFitness( np.array([-dict_values["makespan"], dict_values["mean_resource_reserve"]]), 2, )
[docs] def build_mode_dict( self, rcpsp_modes_from_solution: list[int] ) -> dict[Hashable, int]: modes_dict = { self.tasks_list_non_dummy[i]: rcpsp_modes_from_solution[i] for i in range(self.n_jobs_non_dummy) } modes_dict[self.source_task] = 1 modes_dict[self.sink_task] = 1 return modes_dict
[docs] def build_mode_array(self, rcpsp_modes_from_solution: list[int]) -> list[int]: modes_dict = self.build_mode_dict( rcpsp_modes_from_solution=rcpsp_modes_from_solution ) return [modes_dict[t] for t in self.tasks_list]
[docs] def return_index_task(self, task: Hashable, offset: int = 0) -> int: return self.index_task[task] + offset
[docs] def satisfy(self, variable: RcpspSolution) -> bool: # type: ignore if variable.rcpsp_schedule_feasible is False: logger.debug("Schedule flagged as infeasible when generated") return False if len(variable.rcpsp_schedule) != self.n_jobs: logger.debug("Missing task in schedule") if self.do_special_constraints: if not check_solution_with_special_constraints( problem=self, solution=variable, relax_the_start_at_end=self.relax_the_start_at_end, ): return False # Check for cumumative resource violation if not variable.check_all_calendar_resource_capacity_constraints(): return False # Check for non-renewable resource violation if not variable.check_all_non_renewable_resource_capacity_constraints(): return False # Check precedences / successors if not variable.check_precedence_constraints(): return False return True
def __str__(self) -> str: val = ( "I'm a RCPSP problem with " + str(self.n_jobs) + " tasks.." + " and ressources =" + str(self.resources_list) ) return val
[docs] def get_solution_type(self) -> type[Solution]: return RcpspSolution
[docs] def get_attribute_register(self) -> EncodingRegister: return EncodingRegister( { "rcpsp_permutation": PermutationRcpsp( range=range(self.n_jobs_non_dummy) ), "rcpsp_modes": ListIntegerRcpsp( length=self.n_jobs_non_dummy, lows=1, ups=[ len(self.mode_details[task]) for task in self.tasks_list_non_dummy ], ), } )
[docs] def get_objective_register(self) -> ObjectiveRegister: objective_handling = ObjectiveHandling.SINGLE dict_objective = { "makespan": ObjectiveDoc(type=TypeObjective.OBJECTIVE, default_weight=-1.0) } # "mean_resource_reserve": {"type": TypeObjective.OBJECTIVE, "default_weight": 1}} if self.do_special_constraints: objective_handling = ObjectiveHandling.AGGREGATE dict_objective["constraint_penalty"] = ObjectiveDoc( type=TypeObjective.PENALTY, default_weight=-100.0 ) return ObjectiveRegister( objective_sense=ModeOptim.MAXIMIZATION, objective_handling=objective_handling, dict_objective_to_doc=dict_objective, )
[docs] def compute_resource_consumption( self, rcpsp_sol: RcpspSolution ) -> npt.NDArray[np.int_]: modes_dict = self.build_mode_dict(rcpsp_sol.rcpsp_modes) makespan = rcpsp_sol.rcpsp_schedule[self.sink_task]["end_time"] consumptions = np.zeros((len(self.resources), makespan + 1), dtype=np.int_) for act_id in rcpsp_sol.rcpsp_schedule: for ir in range(len(self.resources)): consumptions[ ir, rcpsp_sol.rcpsp_schedule[act_id]["start_time"] + 1 : rcpsp_sol.rcpsp_schedule[act_id]["end_time"] + 1, ] += self.mode_details[act_id][modes_dict[act_id]][ self.resources_list[ir] ] return consumptions
[docs] def plot_ressource_view(self, rcpsp_sol: RcpspSolution) -> None: consumption = self.compute_resource_consumption(rcpsp_sol=rcpsp_sol) fig, ax = plt.subplots(nrows=len(self.resources_list), sharex=True) for i in range(len(self.resources_list)): ax[i].axhline( y=self.get_max_resource_capacity(self.resources_list[i]), label=self.resources_list[i], ) ax[i].plot(consumption[i, :]) ax[i].legend()
[docs] def copy(self) -> "RcpspProblem": model = RcpspProblem( resources=self.resources, tasks_list=self.tasks_list, source_task=self.source_task, sink_task=self.sink_task, non_renewable_resources=self.non_renewable_resources, mode_details=deepcopy(self.mode_details), successors=deepcopy(self.successors), horizon=self.horizon, name_task=self.name_task, mean_resource_reserve=self.costs.get("mean_resource_reserve", False), fixed_modes=self.fixed_modes, fixed_permutation=self.fixed_permutation, ) return model
[docs] def get_dummy_solution(self) -> RcpspSolution: """Apply serial sgs on the trivial task permutation. See RcpspSolution about details on conversion permutation -> schedule. Returns: """ sol = RcpspSolution( problem=self, rcpsp_permutation=list(range(self.n_jobs_non_dummy)), rcpsp_modes=[1 for i in range(self.n_jobs_non_dummy)], ) return sol
[docs] def convert_task_ids_to_permutation(self, tasks: list[Task]) -> list[int]: """Convert list of task ids into a permutation in proper format for `RcpspSolution`. - remove source and sink if present - use the indices among non-dummy tasks Args: tasks: Returns: """ return [ self.index_task_non_dummy[task] for task in tasks if task in self.index_task_non_dummy ]
[docs] def get_resource_available(self, res: str, time: int) -> int: if self.is_calendar: return self.resources.get(res, [0])[time] # type: ignore return self.resources.get(res, 0) # type: ignore
[docs] def get_max_resource_capacity(self, res: str) -> int: if self.is_calendar: return max(self.resources.get(res, [0])) # type: ignore return self.resources.get(res, 0) # type: ignore
[docs] def set_fixed_attributes( self, attribute_name: str, solution: RcpspSolution ) -> None: if attribute_name == "rcpsp_modes": self.set_fixed_modes(solution.rcpsp_modes) elif attribute_name == "rcpsp_permutation": self.set_fixed_permutation(solution.rcpsp_permutation) else: raise NotImplementedError()
[docs] def set_fixed_modes(self, fixed_modes: list[int]) -> None: self.fixed_modes = fixed_modes
[docs] def set_fixed_permutation(self, fixed_permutation: list[int]) -> None: self.fixed_permutation = fixed_permutation
[docs] def create_np_data_and_jit_functions( rcpsp_problem: RcpspProblem, ) -> tuple[ Callable[ ..., tuple[dict[int, tuple[int, int]], bool], ], Callable[ ..., tuple[dict[int, tuple[int, int]], bool], ], Callable[ ..., float, ], ]: consumption_array = np.zeros( ( rcpsp_problem.n_jobs, rcpsp_problem.max_number_of_mode, len(rcpsp_problem.resources_list), ), dtype=np.int_, ) duration_array = np.zeros( (rcpsp_problem.n_jobs, rcpsp_problem.max_number_of_mode), dtype=np.int_ ) predecessors = np.zeros((rcpsp_problem.n_jobs, rcpsp_problem.n_jobs), dtype=np.int_) successors = np.zeros((rcpsp_problem.n_jobs, rcpsp_problem.n_jobs), dtype=np.int_) horizon = rcpsp_problem.horizon ressource_available = np.zeros( (len(rcpsp_problem.resources_list), horizon), dtype=np.int_ ) ressource_renewable = np.ones((len(rcpsp_problem.resources_list)), dtype=bool) minimum_starting_time_array = np.zeros(rcpsp_problem.n_jobs, dtype=np.int_) for i in range(len(rcpsp_problem.tasks_list)): task = rcpsp_problem.tasks_list[i] index_mode = 0 for mode in sorted( rcpsp_problem.mode_details[rcpsp_problem.tasks_list[i]].keys() ): for k in range(len(rcpsp_problem.resources_list)): consumption_array[i, index_mode, k] = rcpsp_problem.mode_details[task][ mode ].get(rcpsp_problem.resources_list[k], 0) duration_array[i, index_mode] = rcpsp_problem.mode_details[task][mode][ "duration" ] index_mode += 1 task_index = {rcpsp_problem.tasks_list[i]: i for i in range(rcpsp_problem.n_jobs)} for k in range(len(rcpsp_problem.resources_list)): if rcpsp_problem.is_varying_resource(): ressource_available[k, :] = rcpsp_problem.resources[ # type: ignore rcpsp_problem.resources_list[k] ][: ressource_available.shape[1]] else: ressource_available[k, :] = np.full( ressource_available.shape[1], rcpsp_problem.resources[rcpsp_problem.resources_list[k]], dtype=np.int_, ) if rcpsp_problem.resources_list[k] in rcpsp_problem.non_renewable_resources: ressource_renewable[k] = False for i in range(len(rcpsp_problem.tasks_list)): task = rcpsp_problem.tasks_list[i] for s in rcpsp_problem.successors[task]: index_s = task_index[s] predecessors[index_s, i] = 1 successors[i, index_s] = 1 if "special_constraints" in rcpsp_problem.__dict__.keys(): for t in rcpsp_problem.special_constraints.start_times_window: if rcpsp_problem.special_constraints.start_times_window[t][0] is not None: minimum_starting_time_array[rcpsp_problem.index_task[t]] = ( rcpsp_problem.special_constraints.start_times_window[t][0] ) func_sgs = partial( sgs_fast, consumption_array=consumption_array, duration_array=duration_array, predecessors=predecessors, successors=successors, horizon=horizon, ressource_available=ressource_available, ressource_renewable=ressource_renewable, minimum_starting_time_array=minimum_starting_time_array, ) func_sgs_2 = partial( sgs_fast_partial_schedule_incomplete_permutation_tasks, consumption_array=consumption_array, duration_array=duration_array, predecessors=predecessors, successors=successors, horizon=horizon, ressource_available=ressource_available, ressource_renewable=ressource_renewable, minimum_starting_time_array=minimum_starting_time_array, ) func_compute_mean_resource = partial( compute_mean_ressource, consumption_array=consumption_array, ressource_available=ressource_available, ressource_renewable=ressource_renewable, ) return func_sgs, func_sgs_2, func_compute_mean_resource
[docs] def evaluate_constraints( solution: RcpspSolution, constraints: SpecialConstraintsDescription, ) -> int: list_constraints_not_respected = compute_constraints_details(solution, constraints) return sum([x[-1] for x in list_constraints_not_respected])
[docs] def compute_constraints_details( solution: RcpspSolution, constraints: SpecialConstraintsDescription, ) -> list[tuple[str, Hashable, Hashable, Optional[int], Optional[int], int]]: if not solution.rcpsp_schedule_feasible: return [] start_together = constraints.start_together start_at_end = constraints.start_at_end start_at_end_plus_offset = constraints.start_at_end_plus_offset disjunctive = constraints.disjunctive_tasks list_constraints_not_respected: list[ tuple[str, Hashable, Hashable, Optional[int], Optional[int], int] ] = [] for t1, t2, off in constraints.start_to_start_min_time_lag: # Check for potential inconsistency with successor relationships if off < 0 and t2 in solution.problem.successors.get(t1, []): logger.warning( f"Potential inconsistency detected: " f"Task {t1} has a start_to_start_min_time_lag constraint with task {t2} " f"with negative offset {off}, but {t2} is also a successor of {t1}. " f"This means: " f"- start_to_start requires: start({t1}) + {off} <= start({t2}), " f" allowing {t2} to start before {t1} completes. " f"- Successor constraint requires: end({t1}) <= start({t2}), " f" requiring {t2} to start after {t1} ends. " f"These constraints may be contradictory depending on task durations." ) time1 = solution.get_start_time(t1) time2 = solution.get_start_time(t2) b = time1 + off <= time2 if not b: list_constraints_not_respected += [ ("start-to-start", t1, t2, time1, time2, abs(time1 + off - time2)) ] for t1, t2, off in constraints.start_to_start_max_time_lag: time1 = solution.get_start_time(t1) time2 = solution.get_start_time(t2) b = (time2 - time1 - off) <= 0 if not b: list_constraints_not_respected += [ ( "start-to-start-max-time-lag", t1, t2, time1, time2, abs(time2 - time1 - off), ) ] for t1, t2 in start_together: time1 = solution.get_start_time(t1) time2 = solution.get_start_time(t2) b = time1 == time2 if not b: list_constraints_not_respected += [ ("start_together", t1, t2, time1, time2, abs(time2 - time1)) ] for t1, t2 in start_at_end: time1 = solution.get_end_time(t1) time2 = solution.get_start_time(t2) b = time1 == time2 if not b: list_constraints_not_respected += [ ("start_at_end", t1, t2, time1, time2, abs(time2 - time1)) ] for t1, t2, off in start_at_end_plus_offset: time1 = solution.get_end_time(t1) + off time2 = solution.get_start_time(t2) b = time2 >= time1 if not b: list_constraints_not_respected += [ ("start_at_end_plus_offset", t1, t2, time1, time2, abs(time2 - time1)) ] for t1, t2 in disjunctive: segt = intersect( [solution.get_start_time(t1), solution.get_end_time(t1)], [solution.get_start_time(t2), solution.get_end_time(t2)], ) if segt is not None: list_constraints_not_respected += [ ("disjunctive", t1, t2, None, None, segt[1] - segt[0]) ] for t in constraints.start_times_window: if constraints.start_times_window[t][0] is not None: if solution.get_start_time(t) < constraints.start_times_window[t][0]: # type: ignore list_constraints_not_respected += [ ( "start_window_0", t, t, None, None, constraints.start_times_window[t][0] # type: ignore - solution.get_start_time(t), ) ] if constraints.start_times_window[t][1] is not None: if solution.get_start_time(t) > constraints.start_times_window[t][1]: # type: ignore list_constraints_not_respected += [ ( "start_window_1", t, t, None, None, -constraints.start_times_window[t][1] # type: ignore + solution.get_start_time(t), ) ] for t in constraints.end_times_window: if constraints.end_times_window[t][0] is not None: if solution.get_end_time(t) < constraints.end_times_window[t][0]: # type: ignore list_constraints_not_respected += [ ( "end_window_0", t, t, None, None, constraints.end_times_window[t][0] - solution.get_end_time(t), # type: ignore ) ] if constraints.end_times_window[t][1] is not None: if solution.get_end_time(t) > constraints.end_times_window[t][1]: # type: ignore list_constraints_not_respected += [ ( "end_window_1", t, t, None, None, -constraints.end_times_window[t][1] + solution.get_end_time(t), # type: ignore ) ] if constraints.pair_mode_constraint is not None: list_constraints_not_respected += compute_details_mode_constraint( solution=solution, pair_mode_constraint=constraints.pair_mode_constraint ) return list_constraints_not_respected
[docs] def check_solution_with_special_constraints( problem: RcpspProblem, solution: RcpspSolution, relax_the_start_at_end: bool = True, ) -> bool: if not solution.rcpsp_schedule_feasible: return False start_together = problem.special_constraints.start_together start_at_end = problem.special_constraints.start_at_end start_at_end_plus_offset = problem.special_constraints.start_at_end_plus_offset disjunctive = problem.special_constraints.disjunctive_tasks for t1, t2, off in problem.special_constraints.start_to_start_min_time_lag: b = solution.get_start_time(t1) + off <= solution.get_start_time(t2) if not b: logger.debug(("start_to_start_min_time_lag NOT respected: ", t1, t2, off)) logger.debug( ( "start(", t1, ") + ", off, " = ", solution.get_start_time(t1) + off, " should be <= start(", t2, ") = ", solution.get_start_time(t2), ) ) return False for t1, t2, offset in problem.special_constraints.start_to_start_max_time_lag: b = solution.get_start_time(t2) <= solution.get_start_time(t1) + offset if not b: logger.debug( ("start_to_start_max_time_lag NOT respected: ", t1, t2, offset) ) logger.debug( ( "start(", t2, ") = ", solution.get_start_time(t2), " should be <= start(", t1, ") + ", offset, " = ", solution.get_start_time(t1) + offset, ) ) return False for t1, t2 in start_together: if not relax_the_start_at_end: b = solution.get_start_time(t1) == solution.get_start_time(t2) if not b: logger.debug(("start_together NOT respected: ", t1, t2)) logger.debug( ( "start(", t1, ") = ", solution.get_start_time(t1), " should be == start(", t2, ") = ", solution.get_start_time(t2), ) ) return False for t1, t2 in start_at_end: if relax_the_start_at_end: b = solution.get_start_time(t2) >= solution.get_end_time(t1) else: b = solution.get_start_time(t2) == solution.get_end_time(t1) if not b: logger.debug(("start_at_end NOT respected: ", t1, t2)) if relax_the_start_at_end: logger.debug( ( "start(", t2, ") = ", solution.get_start_time(t2), " should be >= end(", t1, ") = ", solution.get_end_time(t1), ) ) else: logger.debug( ( "start(", t2, ") = ", solution.get_start_time(t2), " should be == end(", t1, ") = ", solution.get_end_time(t1), ) ) return False for t1, t2, off in start_at_end_plus_offset: b = solution.get_start_time(t2) >= solution.get_end_time(t1) + off if not b: logger.debug(("start_at_end_plus_offset NOT respected: ", t1, t2, off)) logger.debug( ( solution.get_start_time(t2), " >= ", solution.get_end_time(t1), "+", off, ) ) return False for t1, t2 in disjunctive: if ( intersect( [solution.get_start_time(t1), solution.get_end_time(t1)], [solution.get_start_time(t2), solution.get_end_time(t2)], ) is not None ): logger.debug(("disjunctive_tasks NOT respected: ", t1, t2)) logger.debug( ( "task ", t1, " [", solution.get_start_time(t1), ", ", solution.get_end_time(t1), "] overlaps with task ", t2, " [", solution.get_start_time(t2), ", ", solution.get_end_time(t2), "]", ) ) return False for t in problem.special_constraints.start_times_window: if problem.special_constraints.start_times_window[t][0] is not None: if ( solution.get_start_time(t) # type: ignore < problem.special_constraints.start_times_window[t][0] ): logger.debug( ( "start time 0, ", t, solution.get_start_time(t), problem.special_constraints.start_times_window[t][0], ) ) return False if problem.special_constraints.start_times_window[t][1] is not None: if ( solution.get_start_time(t) # type: ignore > problem.special_constraints.start_times_window[t][1] ): logger.debug( ( "start time 1, ", t, solution.get_start_time(t), problem.special_constraints.start_times_window[t][1], ) ) return False for t in problem.special_constraints.end_times_window: if problem.special_constraints.end_times_window[t][0] is not None: if ( solution.get_end_time(t) # type: ignore < problem.special_constraints.end_times_window[t][0] ): logger.debug( ( "end time 0, ", t, solution.get_end_time(t), problem.special_constraints.end_times_window[t][0], ) ) return False if problem.special_constraints.end_times_window[t][1] is not None: if ( solution.get_end_time(t) # type: ignore > problem.special_constraints.end_times_window[t][1] ): logger.debug( ( "end time 1, ", t, solution.get_end_time(t), problem.special_constraints.end_times_window[t][1], ) ) return False if problem.special_constraints.pair_mode_constraint is not None: b = check_pair_mode_constraint( solution=solution, pair_mode_constraint=problem.special_constraints.pair_mode_constraint, ) if not b: logger.debug("pair_mode_constraint NOT respected") return False return True
[docs] def check_pair_mode_constraint( solution: RcpspSolution, pair_mode_constraint: PairModeConstraint ): if pair_mode_constraint.allowed_mode_assignment is not None: for ac1, ac2 in pair_mode_constraint.allowed_mode_assignment: mode_ac1 = solution.get_mode(ac1) mode_ac2 = solution.get_mode(ac2) if (mode_ac1, mode_ac2) not in pair_mode_constraint.allowed_mode_assignment[ ac1, ac2 ]: return False return True if pair_mode_constraint.same_score_mode is not None: for ac1, ac2 in pair_mode_constraint.same_score_mode: score_ac1 = pair_mode_constraint.score_mode[ac1, solution.get_mode(ac1)] score_ac2 = pair_mode_constraint.score_mode[ac2, solution.get_mode(ac2)] if score_ac1 != score_ac2: return False return True
[docs] def compute_details_mode_constraint( solution: RcpspSolution, pair_mode_constraint: PairModeConstraint ): list_constraints_not_respected: list[ tuple[str, Hashable, Hashable, Optional[int], Optional[int], int] ] = [] if pair_mode_constraint.allowed_mode_assignment is not None: for ac1, ac2 in pair_mode_constraint.allowed_mode_assignment: mode_ac1 = solution.get_mode(ac1) mode_ac2 = solution.get_mode(ac2) if (mode_ac1, mode_ac2) not in pair_mode_constraint.allowed_mode_assignment[ ac1, ac2 ]: list_constraints_not_respected.append( ("pair_mode_assignment", ac1, ac2, mode_ac1, mode_ac2, 100) ) return list_constraints_not_respected if pair_mode_constraint.same_score_mode is not None: for ac1, ac2 in pair_mode_constraint.same_score_mode: score_ac1 = pair_mode_constraint.score_mode[ac1, solution.get_mode(ac1)] score_ac2 = pair_mode_constraint.score_mode[ac2, solution.get_mode(ac2)] if score_ac1 != score_ac2: list_constraints_not_respected.append( ("pair_mode_score", ac1, ac2, score_ac1, score_ac2, 100) ) return list_constraints_not_respected