Source code for discrete_optimization.rcpsp.solution

#  Copyright (c) 2023 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

from __future__ import (
    annotations,  # make annotations be considered as string by default
)

from collections.abc import Hashable, Iterable
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Optional

import numpy as np
from numpy import typing as npt

from discrete_optimization.generic_tasks_tools.multimode import MultimodeSolution
from discrete_optimization.generic_tasks_tools.scheduling import SchedulingSolution
from discrete_optimization.generic_tools.do_problem import RobustProblem

if TYPE_CHECKING:  # avoid circular imports due to annotations
    from discrete_optimization.rcpsp.problem import RcpspProblem

Task = Hashable


[docs] class TaskDetails: def __init__(self, start: int, end: int): self.start = start self.end = end
[docs] class RcpspSolution(SchedulingSolution[Task], MultimodeSolution[Task]): """Solution to RcpspProblem problems. Attributes: problem: RCPSP problem for which this is a solution rcpsp_permutation: Tasks permutation, i.e. list of tasks indices among non-dummy tasks (i.e. excluding source and sink tasks). See below how it is related to the schedule. If given and schedule not given, used to reconstruct the schedule. if not given, deduced from schedule. If not given and schedule not given, it is set to `problem.fixed_permutation` (useful for alternating genetic algorithms) rcpsp_schedule: Tasks schedule. ( task -> "start_time" or "end_time" -> time) If given used to construct `standardised_permutation`. If given and `rcpsp_permutation` not given, used to construct `rcpsp_permutation`. If given and `rcpsp_permutation` given, no consistency check. If not given, deduced from `rcpsp_permutation` if possible. If not possible, `rcpsp_schedule_feasible` set to False and `rcpsp_schedule` set to a partially filled schedule. In case of `Aggreg_RcpspModel`, kept empty. rcpsp_modes: Mode used for each task. Same order as `problem.tasks_list_non_dummy`. If not given we use `problem.fixed_modes` if existing, else 1 for each task. rcpsp_schedule_feasible: True if a schedule can be deduced from permutation. False if it leads to incoherency preventing a schedule generation. Recomputed when schedule is (re)computed from permutation. standardised_permutation: Permutation deduced uniquely from schedule. If given, not recomputed. Can be different from `rcpsp_permutation` as different permutations can lead to same schedule. fast: boolean indicating if we use the fast functions to generate schedule from permutation. ## More about conversion permutation <-> schedule: ### Schedule -> permutation Take the list of tasks indices among non-dummy tasks (i.e. excluding source and sink tasks), sorted by their starting times. ### Permutation -> schedule: serial-SGS (Schedule Generation Scheme) - Initialization: - the source task is set at time 0 - the resource availability is set to the initial calendar given by the problem - Loop: the schedule is filled task by task, in the order given by the permutation, and taking into account - the precedence constraints: a task starts after the end of the tasks before it in the precedence graph - the resource constraints: during the task, the needed resource must be available When a new task has been inserted, the availability of each resource is updated accordingly, i.e. we remove the necessary resources for the task (according to the chosen mode set by `rcpsp_modes`) for the task duration, and after if the resource is non-renewable. - When hitting an impossibility (e.g. inserting a task after one of its successor, or when the resources are not enough available until the problem horizon) the permutation is set infeasible (so `rcpsp_schedule_feasible` is set to `False`). See "Compute a dummy solution for RCPSP" in the tutorial "notebooks/RCPSP tutorials/RCPSP-1 Introduction.ipynb" for more details. Example: This example demonstrates how a permutation of non-dummy tasks is transformed into a schedule, adhering to precedence and resource constraints. >>> from discrete_optimization.rcpsp.problem import RcpspProblem >>> from discrete_optimization.rcpsp.solution import RcpspSolution >>> # Define resources >>> resources = {"R1": 5, "R2": 2} >>> non_renewable_resources = [] # Empty if all resources replenish when a task ends >>> # Define mode details >>> # Format: { task_id: { mode_id: { "duration": int, "resource_name": amount } } } >>> mode_details = { ... "source": {1: {"duration": 0, "R1": 0, "R2": 0}}, # Source (Dummy) ... "task-1": {1: {"duration": 4, "R1": 2, "R2": 2}}, ... "task-2": {1: {"duration": 3, "R1": 3, "R2": 0}}, ... "task-3": {1: {"duration": 5, "R1": 2, "R2": 1}}, ... "task-4": {1: {"duration": 2, "R1": 1, "R2": 1}}, ... "sink": {1: {"duration": 0, "R1": 0, "R2": 0}}, # Sink (Dummy) ... } >>> # Define precedence graph >>> successors = { ... "source": ["task-1", "task-2"], # First tasks: 1 and 2 ... "task-1": ["task-3"], # Task 1 must finish before 3 ... "task-2": ["task-4"], # Task 2 must finish before 4 ... "task-3": ["sink"], # Task 3 leads to Sink ... "task-4": ["sink"], # Task 4 leads to Sink ... "sink": [], # Sink has no successors ... } >>> # Initialize the Problem >>> problem = RcpspProblem( ... resources=resources, ... non_renewable_resources=non_renewable_resources, ... mode_details=mode_details, ... successors=successors, ... horizon=20, # Maximum time allowed for the project ... source_task="source", ... sink_task="sink" ... ) >>> # Create solution >>> # Convert list of task ids into a licit permutation (using indices among non-dummy tasks) >>> rcpsp_permutation = problem.convert_task_ids_to_permutation(["task-2", "task-1", "task-4", "task-3"]) >>> print(rcpsp_permutation) [1, 0, 3, 2] >>> solution = RcpspSolution( ... problem=problem, ... rcpsp_permutation=rcpsp_permutation, ... rcpsp_modes=[1, 1, 1, 1] # same mode for each task ... ) >>> # The Serial SGS calculates the start/end times according: >>> print(solution.rcpsp_schedule["task-2"]) # Task 2 {'start_time': 0, 'end_time': 3} >>> print(solution.rcpsp_schedule["task-1"]) # Task 1 follows Task 2 in permutation {'start_time': 0, 'end_time': 4} >>> print(solution.rcpsp_schedule["task-4"]) # Task 4 can start after Task 2, but only when ressources are available (thus after task 1) {'start_time': 4, 'end_time': 6} >>> print(solution.rcpsp_schedule["task-3"]) # Task 3 can start after Task 1 (and ressources are available) {'start_time': 4, 'end_time': 9} """ problem: RcpspProblem def __init__( self, problem: RcpspProblem, rcpsp_permutation: Optional[list[int]] = None, rcpsp_schedule: Optional[dict[Hashable, dict[str, int]]] = None, rcpsp_modes: Optional[list[int]] = None, rcpsp_schedule_feasible: bool = True, standardised_permutation: Optional[list[int]] = None, fast: bool = True, ): """ Args: problem: rcpsp_permutation: rcpsp_schedule: rcpsp_modes: rcpsp_schedule_feasible: standardised_permutation: fast: """ super().__init__(problem=problem) self.rcpsp_schedule_feasible = rcpsp_schedule_feasible self.fast = fast self.rcpsp_modes: list[int] self.rcpsp_schedule: dict[Hashable, dict[str, int]] self.rcpsp_permutation: list[int] self.standardised_permutation: list[int] # init rcpsp_modes if rcpsp_modes is None: if self.problem.fixed_modes is not None: self.rcpsp_modes = self.problem.fixed_modes else: self.rcpsp_modes = [1 for i in range(self.problem.n_jobs_non_dummy)] else: self.rcpsp_modes = rcpsp_modes # init rcpsp_permutation if rcpsp_permutation is None: if rcpsp_schedule is None: if self.problem.fixed_permutation is not None: self.rcpsp_permutation = self.problem.fixed_permutation else: raise ValueError( "rcpsp_permutation and rcpsp_schedule can be None together " "only if problem.fixed_permutation is not None" ) else: self.rcpsp_schedule = rcpsp_schedule standardised_permutation = self.generate_permutation_from_schedule() self.rcpsp_permutation = deepcopy(standardised_permutation) else: self.rcpsp_permutation = rcpsp_permutation # init rcpsp_schedule if rcpsp_schedule is None: self.generate_schedule_from_permutation_serial_sgs(do_fast=self.fast) else: self.rcpsp_schedule = rcpsp_schedule # init standardised_permutation if standardised_permutation is None: self.standardised_permutation = self.generate_permutation_from_schedule() else: self.standardised_permutation = standardised_permutation # schedule already computed (prevent issues with __setattr__ hack) self._schedule_to_recompute = False
[docs] def change_problem(self, new_problem: RcpspProblem) -> None: # type: ignore super().change_problem(new_problem=new_problem) # recompute schedule and standardised permutation with respect to the new problem self.generate_schedule_from_permutation_serial_sgs(do_fast=self.fast) self.standardised_permutation = self.generate_permutation_from_schedule()
def __setattr__(self, key: str, value: Any) -> None: super.__setattr__(self, key, value) if key == "rcpsp_permutation": self._schedule_to_recompute = True
[docs] def copy(self) -> "RcpspSolution": return RcpspSolution( problem=self.problem, rcpsp_permutation=deepcopy(self.rcpsp_permutation), rcpsp_modes=deepcopy(self.rcpsp_modes), rcpsp_schedule=deepcopy(self.rcpsp_schedule), rcpsp_schedule_feasible=self.rcpsp_schedule_feasible, standardised_permutation=self.standardised_permutation, fast=self.fast, )
[docs] def lazy_copy(self) -> "RcpspSolution": return RcpspSolution( problem=self.problem, rcpsp_permutation=self.rcpsp_permutation, rcpsp_modes=self.rcpsp_modes, rcpsp_schedule=self.rcpsp_schedule, rcpsp_schedule_feasible=self.rcpsp_schedule_feasible, standardised_permutation=self.standardised_permutation, fast=self.fast, )
def __str__(self) -> str: if self.rcpsp_schedule is None: sched_str = "None" else: sched_str = str(self.rcpsp_schedule) val = "RCPSP solution (rcpsp_schedule): " + sched_str return val
[docs] def generate_permutation_from_schedule(self) -> list[int]: sorted_task = [ self.problem.index_task_non_dummy[i] for i in sorted( self.rcpsp_schedule, key=lambda x: self.rcpsp_schedule[x]["start_time"] ) if i in self.problem.index_task_non_dummy ] return sorted_task
[docs] def compute_mean_resource_reserve(self, fast: bool = True) -> float: if not fast: return compute_mean_resource_reserve( solution=self, rcpsp_problem=self.problem ) else: if not self.rcpsp_schedule_feasible: return 0.0 last_activity = self.problem.sink_task makespan = self.rcpsp_schedule[last_activity]["end_time"] if max(self.rcpsp_modes) > self.problem.max_number_of_mode: # non existing modes return 0.0 else: return self.problem.compute_mean_resource( horizon=makespan, modes_array=np.array( self.problem.build_mode_array(self.rcpsp_modes) ) - 1, # permutation_task=array(task)->task index start_array=np.array( [ self.rcpsp_schedule[t]["start_time"] for t in self.problem.tasks_list ] ), end_array=np.array( [ self.rcpsp_schedule[t]["end_time"] for t in self.problem.tasks_list ] ), )
[docs] def generate_schedule_from_permutation_serial_sgs( self, do_fast: bool = True ) -> None: self._schedule_to_recompute = False if isinstance(self.problem, RobustProblem): self.rcpsp_schedule = {} self.rcpsp_schedule_feasible = True else: if do_fast: schedule: dict[int, tuple[int, int]] if max(self.rcpsp_modes) > self.problem.max_number_of_mode: # non existing modes schedule, unfeasible = {}, True else: schedule, unfeasible = self.problem.func_sgs( permutation_task=permutation_do_to_permutation_sgs_fast( self.problem, self.rcpsp_permutation ), modes_array=np.array( self.problem.build_mode_array(self.rcpsp_modes) ) - 1, ) self.rcpsp_schedule_feasible = not unfeasible self.rcpsp_schedule = {} for k in schedule: self.rcpsp_schedule[self.problem.tasks_list[k]] = { "start_time": schedule[k][0], "end_time": schedule[k][1], } if self.problem.sink_task not in self.rcpsp_schedule: self.rcpsp_schedule[self.problem.sink_task] = { "start_time": 99999999, "end_time": 99999999, } else: ( self.rcpsp_schedule, self.rcpsp_schedule_feasible, ) = generate_schedule_from_permutation_serial_sgs( solution=self, rcpsp_problem=self.problem )
[docs] def generate_schedule_from_permutation_serial_sgs_2( self, current_t: int = 0, completed_tasks: Optional[dict[Hashable, TaskDetails]] = None, scheduled_tasks_start_times: Optional[dict[Hashable, int]] = None, do_fast: bool = True, ) -> None: if completed_tasks is None: completed_tasks = {} if scheduled_tasks_start_times is None: scheduled_tasks_start_times = {} if do_fast and not self.problem.do_special_constraints: schedule: dict[int, tuple[int, int]] if max(self.rcpsp_modes) > self.problem.max_number_of_mode: # non existing modes schedule, unfeasible = {}, True else: schedule, unfeasible = self.problem.func_sgs_2( current_time=current_t, completed_task_indicator=np.array( [ 1 if self.problem.tasks_list[i] in completed_tasks else 0 for i in range(self.problem.n_jobs) ] ), completed_task_times=np.array( [ completed_tasks[self.problem.tasks_list[i]].end if self.problem.tasks_list[i] in completed_tasks else 0 for i in range(self.problem.n_jobs) ] ), scheduled_task=np.array( [ scheduled_tasks_start_times[self.problem.tasks_list[i]] if self.problem.tasks_list[i] in scheduled_tasks_start_times else -1 for i in range(self.problem.n_jobs) ] ), permutation_task=permutation_do_to_permutation_sgs_fast( self.problem, self.rcpsp_permutation ), modes_array=np.array( self.problem.build_mode_array(self.rcpsp_modes) ) - 1, ) self.rcpsp_schedule = {} for k in schedule: self.rcpsp_schedule[self.problem.tasks_list[k]] = { "start_time": schedule[k][0], "end_time": schedule[k][1], } if self.problem.sink_task not in self.rcpsp_schedule: self.rcpsp_schedule[self.problem.sink_task] = { "start_time": 999999999, "end_time": 999999999, } self.rcpsp_schedule_feasible = not unfeasible self._schedule_to_recompute = False else: if self.problem.do_special_constraints: ( self.rcpsp_schedule, self.rcpsp_schedule_feasible, ) = generate_schedule_from_permutation_serial_sgs_partial_schedule_specialized_constraints( solution=self, current_t=current_t, completed_tasks=completed_tasks, scheduled_tasks_start_times=scheduled_tasks_start_times, rcpsp_problem=self.problem, ) else: ( self.rcpsp_schedule, self.rcpsp_schedule_feasible, ) = generate_schedule_from_permutation_serial_sgs_partial_schedule( solution=self, current_t=current_t, completed_tasks=completed_tasks, scheduled_tasks_start_times=scheduled_tasks_start_times, rcpsp_problem=self.problem, ) self._schedule_to_recompute = False
[docs] def get_max_end_time(self) -> int: return self.rcpsp_schedule[self.problem.sink_task]["end_time"]
[docs] def get_start_time(self, task: Hashable) -> int: return self.rcpsp_schedule[task]["start_time"]
[docs] def get_end_time(self, task: Hashable) -> int: return self.rcpsp_schedule[task]["end_time"]
[docs] def get_start_times_list(self, task: Hashable) -> list[int]: return [self.get_start_time(task)]
[docs] def get_end_times_list(self, task: Hashable) -> list[int]: return [self.get_end_time(task)]
[docs] def get_active_time(self, task: Hashable) -> list[int]: return list(range(self.get_start_time(task), self.get_end_time(task)))
[docs] def get_mode(self, task: Hashable) -> int: if task in (self.problem.source_task, self.problem.sink_task): # should have only 1 mode (no particular meaning) return next(iter(self.problem.mode_details[task])) return self.rcpsp_modes[self.problem.index_task_non_dummy[task]]
def __hash__(self) -> int: return hash((tuple(self.rcpsp_permutation), tuple(self.rcpsp_modes))) def __eq__(self, other: object) -> bool: return ( isinstance(other, RcpspSolution) and self.rcpsp_permutation == other.rcpsp_permutation and self.rcpsp_modes == other.rcpsp_modes )
[docs] def permutation_do_to_permutation_sgs_fast( rcpsp_problem: RcpspProblem, permutation_do: Iterable[int] ) -> npt.NDArray[np.int_]: perm_extended = [ rcpsp_problem.index_task[rcpsp_problem.tasks_list_non_dummy[x]] for x in permutation_do ] perm_extended.insert(0, rcpsp_problem.index_task[rcpsp_problem.source_task]) perm_extended.append(rcpsp_problem.index_task[rcpsp_problem.sink_task]) return np.array(perm_extended, dtype=np.int_)
[docs] def generate_schedule_from_permutation_serial_sgs( solution: RcpspSolution, rcpsp_problem: RcpspProblem ) -> tuple[dict[Hashable, dict[str, int]], bool]: activity_end_times = {} unfeasible_non_renewable_resources = False new_horizon = rcpsp_problem.horizon resource_avail_in_time = {} for res in rcpsp_problem.resources_list: if rcpsp_problem.is_varying_resource(): resource_avail_in_time[res] = rcpsp_problem.resources[res][ # type: ignore : new_horizon + 1 ] else: resource_avail_in_time[res] = np.full( new_horizon, rcpsp_problem.resources[res], dtype=np.int_ ).tolist() minimum_starting_time = {} for act in rcpsp_problem.tasks_list: minimum_starting_time[act] = 0 perm_extended = [ rcpsp_problem.tasks_list_non_dummy[x] for x in solution.rcpsp_permutation ] perm_extended.insert(0, rcpsp_problem.source_task) perm_extended.append(rcpsp_problem.sink_task) modes_dict = rcpsp_problem.build_mode_dict(solution.rcpsp_modes) for k in modes_dict: if modes_dict[k] not in rcpsp_problem.mode_details[k]: modes_dict[k] = 1 while len(perm_extended) > 0 and not unfeasible_non_renewable_resources: for id_successor in perm_extended: respected = True for pred in rcpsp_problem.successors: if ( id_successor in rcpsp_problem.successors[pred] and pred in perm_extended ): respected = False break if respected: act_id = id_successor break # for act_id in perm_extended: current_min_time = minimum_starting_time[act_id] valid = False while not valid: valid = True for t in range( current_min_time, current_min_time + rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"], ): for res in rcpsp_problem.resources_list: if ( rcpsp_problem.mode_details[act_id][modes_dict[act_id]].get( res, 0 ) == 0 ): continue if t < new_horizon: if ( resource_avail_in_time[res][t] < rcpsp_problem.mode_details[act_id][modes_dict[act_id]][ res ] ): valid = False else: unfeasible_non_renewable_resources = True if not valid: current_min_time += 1 if not unfeasible_non_renewable_resources: end_t = ( current_min_time + rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"] ) for t in range(current_min_time, end_t): for res in resource_avail_in_time: if ( rcpsp_problem.mode_details[act_id][modes_dict[act_id]].get( res, 0 ) == 0 ): continue resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[ act_id ][modes_dict[act_id]][res] if res in rcpsp_problem.non_renewable_resources and t == end_t - 1: for tt in range(end_t, new_horizon): resource_avail_in_time[res][tt] -= ( rcpsp_problem.mode_details[act_id][modes_dict[act_id]][ res ] ) if resource_avail_in_time[res][tt] < 0: unfeasible_non_renewable_resources = True activity_end_times[act_id] = end_t perm_extended.remove(act_id) for s in rcpsp_problem.successors[act_id]: minimum_starting_time[s] = max( minimum_starting_time[s], activity_end_times[act_id] ) rcpsp_schedule: dict[Hashable, dict[str, int]] = {} for act_id in activity_end_times: rcpsp_schedule[act_id] = {} rcpsp_schedule[act_id]["start_time"] = ( activity_end_times[act_id] - rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"] ) rcpsp_schedule[act_id]["end_time"] = activity_end_times[act_id] if unfeasible_non_renewable_resources: rcpsp_schedule_feasible = False last_act_id = rcpsp_problem.sink_task if last_act_id not in rcpsp_schedule: rcpsp_schedule[last_act_id] = {} rcpsp_schedule[last_act_id]["start_time"] = 99999999 rcpsp_schedule[last_act_id]["end_time"] = 9999999 else: rcpsp_schedule_feasible = True return rcpsp_schedule, rcpsp_schedule_feasible
[docs] def generate_schedule_from_permutation_serial_sgs_special_constraints( solution: RcpspSolution, rcpsp_problem: RcpspProblem ) -> tuple[dict[Hashable, dict[str, int]], bool]: activity_end_times = {} unfeasible_non_renewable_resources = False new_horizon = rcpsp_problem.horizon resource_avail_in_time = {} for res in rcpsp_problem.resources_list: if rcpsp_problem.is_varying_resource(): resource_avail_in_time[res] = rcpsp_problem.resources[res][ # type: ignore : new_horizon + 1 ] else: resource_avail_in_time[res] = np.full( new_horizon, rcpsp_problem.resources[res], dtype=np.int_ ).tolist() minimum_starting_time = {} for act in rcpsp_problem.tasks_list: minimum_starting_time[act] = 0 if rcpsp_problem.do_special_constraints: if act in rcpsp_problem.special_constraints.start_times_window: minimum_starting_time[act] = ( rcpsp_problem.special_constraints.start_times_window[act][0] # type: ignore if rcpsp_problem.special_constraints.start_times_window[act][0] is not None else 0 ) perm_extended = [ rcpsp_problem.tasks_list_non_dummy[x] for x in solution.rcpsp_permutation ] perm_extended.insert(0, rcpsp_problem.source_task) perm_extended.append(rcpsp_problem.sink_task) modes_dict = rcpsp_problem.build_mode_dict(solution.rcpsp_modes) def ressource_consumption( res: str, task: Hashable, duration: int, mode: int ) -> int: dur = rcpsp_problem.mode_details[task][mode]["duration"] if duration > dur: return 0 return rcpsp_problem.mode_details[task][mode].get(res, 0) for k in modes_dict: if modes_dict[k] not in rcpsp_problem.mode_details[k]: modes_dict[k] = 1 def look_for_task(perm: list[Hashable], ignore_sc: bool = False) -> list[Hashable]: act_ids = [] for task_id in perm: respected = True # Check all kind of precedence constraints.... for pred in rcpsp_problem.predecessors.get(task_id, {}): if pred in perm_extended: respected = False break if not ignore_sc: for ( pred ) in rcpsp_problem.special_constraints.dict_start_at_end_reverse.get( task_id, {} ): if pred in perm_extended: respected = False break for pred in rcpsp_problem.special_constraints.dict_start_at_end_offset_reverse.get( task_id, {} ): if pred in perm_extended: respected = False break for pred in rcpsp_problem.special_constraints.dict_start_after_nunit_reverse.get( task_id, {} ): if pred in perm_extended: respected = False break task_to_start_too = set() if respected: task_to_start_too = ( rcpsp_problem.special_constraints.dict_start_together.get( task_id, set() ) ) if not ignore_sc: if len(task_to_start_too) > 0: if not all( s not in perm_extended for t in task_to_start_too for s in rcpsp_problem.predecessors[t] ): respected = False if respected: act_ids = [task_id] + list(task_to_start_too) break return act_ids while len(perm_extended) > 0 and not unfeasible_non_renewable_resources: act_ids = look_for_task( [ k for k in rcpsp_problem.special_constraints.dict_start_at_end_reverse if k in perm_extended ] ) act_ids = [] if len(act_ids) == 0: act_ids = look_for_task(perm_extended) if len(act_ids) == 0: act_ids = look_for_task(perm_extended, ignore_sc=True) current_min_time = max([minimum_starting_time[act_id] for act_id in act_ids]) max_duration = max( [ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"] for act_id in act_ids ] ) valid = False while not valid: valid = True for t in range(current_min_time, current_min_time + max_duration): for res in rcpsp_problem.resources_list: r = sum( [ ressource_consumption( res=res, task=task, duration=t - current_min_time, mode=modes_dict[task], ) for task in act_ids ] ) if r == 0: continue if t < new_horizon: if resource_avail_in_time[res][t] < r: valid = False break else: unfeasible_non_renewable_resources = True if not valid: break if not valid: current_min_time += 1 if not unfeasible_non_renewable_resources: end_t = current_min_time + max_duration for t in range(current_min_time, current_min_time + max_duration): for res in resource_avail_in_time: r = sum( [ ressource_consumption( res=res, task=task, duration=t - current_min_time, mode=modes_dict[task], ) for task in act_ids ] ) resource_avail_in_time[res][t] -= r if res in rcpsp_problem.non_renewable_resources and t == end_t - 1: for tt in range(end_t, new_horizon): resource_avail_in_time[res][tt] -= r if resource_avail_in_time[res][tt] < 0: unfeasible_non_renewable_resources = True for act_id in act_ids: activity_end_times[act_id] = ( current_min_time + rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"] ) perm_extended.remove(act_id) for s in rcpsp_problem.successors[act_id]: minimum_starting_time[s] = max( minimum_starting_time[s], activity_end_times[act_id] ) for s in rcpsp_problem.special_constraints.dict_start_at_end.get( act_id, {} ): minimum_starting_time[s] = max( minimum_starting_time[s], activity_end_times[act_id] ) for s in rcpsp_problem.special_constraints.dict_start_after_nunit.get( act_id, {} ): minimum_starting_time[s] = max( minimum_starting_time[s], current_min_time + rcpsp_problem.special_constraints.dict_start_after_nunit[ act_id ][s], ) for s in rcpsp_problem.special_constraints.dict_start_at_end_offset.get( act_id, {} ): minimum_starting_time[s] = max( minimum_starting_time[s], activity_end_times[act_id] + rcpsp_problem.special_constraints.dict_start_at_end_offset[ act_id ][s], ) rcpsp_schedule: dict[Hashable, dict[str, int]] = {} for act_id in activity_end_times: rcpsp_schedule[act_id] = {} rcpsp_schedule[act_id]["start_time"] = ( activity_end_times[act_id] - rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"] ) rcpsp_schedule[act_id]["end_time"] = activity_end_times[act_id] if unfeasible_non_renewable_resources: rcpsp_schedule_feasible = False last_act_id = rcpsp_problem.sink_task if last_act_id not in rcpsp_schedule: rcpsp_schedule[last_act_id] = {} rcpsp_schedule[last_act_id]["start_time"] = 99999999 rcpsp_schedule[last_act_id]["end_time"] = 9999999 else: rcpsp_schedule_feasible = True return rcpsp_schedule, rcpsp_schedule_feasible
[docs] def generate_schedule_from_permutation_serial_sgs_partial_schedule( solution: RcpspSolution, rcpsp_problem: RcpspProblem, current_t: int, completed_tasks: dict[Hashable, TaskDetails], scheduled_tasks_start_times: dict[Hashable, int], ) -> tuple[dict[Hashable, dict[str, int]], bool]: activity_end_times = {} unfeasible_non_renewable_resources = False new_horizon = rcpsp_problem.horizon resource_avail_in_time = {} for res in rcpsp_problem.resources_list: if rcpsp_problem.is_varying_resource(): resource_avail_in_time[res] = rcpsp_problem.resources[res][ # type: ignore : new_horizon + 1 ] else: resource_avail_in_time[res] = np.full( new_horizon, rcpsp_problem.resources[res], dtype=np.int_ ).tolist() minimum_starting_time = {} for act in rcpsp_problem.tasks_list: if act in list(scheduled_tasks_start_times.keys()): minimum_starting_time[act] = scheduled_tasks_start_times[act] else: minimum_starting_time[act] = current_t perm_extended = [ rcpsp_problem.tasks_list_non_dummy[x] for x in solution.rcpsp_permutation ] perm_extended.insert(0, rcpsp_problem.source_task) perm_extended.append(rcpsp_problem.sink_task) modes_dict = rcpsp_problem.build_mode_dict(solution.rcpsp_modes) # Update current resource usage by the scheduled task (ongoing task, in practice) for act_id in scheduled_tasks_start_times: current_min_time = scheduled_tasks_start_times[act_id] end_t = ( current_min_time + rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"] ) for t in range(current_min_time, end_t): for res in resource_avail_in_time: resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[act_id][ modes_dict[act_id] ].get(res, 0) if res in rcpsp_problem.non_renewable_resources and t == end_t - 1: for tt in range(end_t, new_horizon): resource_avail_in_time[res][tt] -= rcpsp_problem.mode_details[ act_id ][modes_dict[act_id]].get(res, 0) if resource_avail_in_time[res][tt] < 0: unfeasible_non_renewable_resources = True activity_end_times[act_id] = end_t perm_extended.remove(act_id) for s in rcpsp_problem.successors[act_id]: minimum_starting_time[s] = max( minimum_starting_time[s], activity_end_times[act_id] ) perm_extended = [x for x in perm_extended if x not in list(completed_tasks)] # fix modes in case specified mode not in mode details for the activites for ac in modes_dict: if modes_dict[ac] not in rcpsp_problem.mode_details[ac]: modes_dict[ac] = 1 while len(perm_extended) > 0 and not unfeasible_non_renewable_resources: # get first activity in perm with precedences respected for id_successor in perm_extended: respected = True for pred in rcpsp_problem.successors.keys(): if ( id_successor in rcpsp_problem.successors[pred] and pred in perm_extended ): respected = False break if respected: act_id = id_successor break current_min_time = minimum_starting_time[act_id] valid = False while not valid: valid = True for t in range( current_min_time, current_min_time + rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"], ): for res in resource_avail_in_time: if t < new_horizon: if resource_avail_in_time[res][t] < rcpsp_problem.mode_details[ act_id ][modes_dict[act_id]].get(res, 0): valid = False else: unfeasible_non_renewable_resources = True if not valid: current_min_time += 1 if not unfeasible_non_renewable_resources: end_t = ( current_min_time + rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"] ) for t in range(current_min_time, end_t): for res in resource_avail_in_time: resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[ act_id ][modes_dict[act_id]].get(res, 0) if res in rcpsp_problem.non_renewable_resources and t == end_t - 1: for tt in range(end_t + 1, new_horizon): resource_avail_in_time[res][tt] -= ( rcpsp_problem.mode_details[act_id][ modes_dict[act_id] ].get(res, 0) ) if resource_avail_in_time[res][tt] < 0: unfeasible_non_renewable_resources = True activity_end_times[act_id] = end_t perm_extended.remove(act_id) for s in rcpsp_problem.successors[act_id]: minimum_starting_time[s] = max( minimum_starting_time[s], activity_end_times[act_id] ) rcpsp_schedule: dict[Hashable, dict[str, int]] = {} for act_id in activity_end_times: rcpsp_schedule[act_id] = {} rcpsp_schedule[act_id]["start_time"] = ( activity_end_times[act_id] - rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"] ) rcpsp_schedule[act_id]["end_time"] = activity_end_times[act_id] for act_id in completed_tasks: rcpsp_schedule[act_id] = {} rcpsp_schedule[act_id]["start_time"] = completed_tasks[act_id].start rcpsp_schedule[act_id]["end_time"] = completed_tasks[act_id].end if unfeasible_non_renewable_resources: rcpsp_schedule_feasible = False last_act_id = rcpsp_problem.sink_task if last_act_id not in rcpsp_schedule: rcpsp_schedule[last_act_id] = {} rcpsp_schedule[last_act_id]["start_time"] = 99999999 rcpsp_schedule[last_act_id]["end_time"] = 9999999 else: rcpsp_schedule_feasible = True return rcpsp_schedule, rcpsp_schedule_feasible
[docs] def generate_schedule_from_permutation_serial_sgs_partial_schedule_specialized_constraints( solution: RcpspSolution, rcpsp_problem: RcpspProblem, current_t: int, completed_tasks: dict[Hashable, TaskDetails], scheduled_tasks_start_times: dict[Hashable, int], ) -> tuple[dict[Hashable, dict[str, int]], bool]: activity_end_times = {} unfeasible_non_renewable_resources = False new_horizon = rcpsp_problem.horizon resource_avail_in_time = {} for res in rcpsp_problem.resources_list: if rcpsp_problem.is_varying_resource(): resource_avail_in_time[res] = rcpsp_problem.resources[res][ # type: ignore : new_horizon + 1 ] else: resource_avail_in_time[res] = np.full( new_horizon, rcpsp_problem.resources[res], dtype=np.int_ ).tolist() def ressource_consumption( res: str, task: Hashable, duration: int, mode: int ) -> int: dur = rcpsp_problem.mode_details[task][mode]["duration"] if duration > dur: return 0 return rcpsp_problem.mode_details[task][mode].get(res, 0) minimum_starting_time = {} for act in rcpsp_problem.tasks_list: if act in list(scheduled_tasks_start_times.keys()): minimum_starting_time[act] = scheduled_tasks_start_times[act] else: minimum_starting_time[act] = current_t if rcpsp_problem.do_special_constraints: if act in rcpsp_problem.special_constraints.start_times_window: minimum_starting_time[act] = ( max( # type: ignore rcpsp_problem.special_constraints.start_times_window[act][0], minimum_starting_time[act], ) if rcpsp_problem.special_constraints.start_times_window[act][0] is not None else minimum_starting_time[act] ) perm_extended = [ rcpsp_problem.tasks_list_non_dummy[x] for x in solution.rcpsp_permutation ] perm_extended.insert(0, rcpsp_problem.source_task) perm_extended.append(rcpsp_problem.sink_task) modes_dict = rcpsp_problem.build_mode_dict(solution.rcpsp_modes) # Update current resource usage by the scheduled task (ongoing task, in practice) for act_id in scheduled_tasks_start_times: current_min_time = scheduled_tasks_start_times[act_id] end_t = ( current_min_time + rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"] ) for t in range(current_min_time, end_t): for res in resource_avail_in_time: resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[act_id][ modes_dict[act_id] ].get(res, 0) if res in rcpsp_problem.non_renewable_resources and t == end_t - 1: for tt in range(end_t, new_horizon): resource_avail_in_time[res][tt] -= rcpsp_problem.mode_details[ act_id ][modes_dict[act_id]].get(res, 0) if resource_avail_in_time[res][tt] < 0: unfeasible_non_renewable_resources = True activity_end_times[act_id] = end_t perm_extended.remove(act_id) for s in rcpsp_problem.successors[act_id]: minimum_starting_time[s] = max( minimum_starting_time[s], activity_end_times[act_id] ) perm_extended = [x for x in perm_extended if x not in list(completed_tasks)] for ac in modes_dict: if modes_dict[ac] not in rcpsp_problem.mode_details[ac]: modes_dict[ac] = 1 while len(perm_extended) > 0 and not unfeasible_non_renewable_resources: act_ids = [] for task_id in perm_extended: respected = True # Check all kind of precedence constraints.... for pred in rcpsp_problem.predecessors.get(task_id, {}): if pred in perm_extended: respected = False break for pred in rcpsp_problem.special_constraints.dict_start_at_end_reverse.get( task_id, {} ): if pred in perm_extended: respected = False break for ( pred ) in rcpsp_problem.special_constraints.dict_start_at_end_offset_reverse.get( task_id, {} ): if pred in perm_extended: respected = False break for ( pred ) in rcpsp_problem.special_constraints.dict_start_after_nunit_reverse.get( task_id, {} ): if pred in perm_extended: respected = False break task_to_start_too: list[Hashable] = [] if respected: task_to_start_too = [ k for k in rcpsp_problem.special_constraints.dict_start_together.get( task_id, set() ) if k in perm_extended ] if len(task_to_start_too) > 0: if not all( s not in perm_extended for t in task_to_start_too for s in rcpsp_problem.predecessors[t] ): respected = False if respected: act_ids = [task_id] + task_to_start_too break if len(act_ids) == 0: for task_id in perm_extended: respected = True # Check all kind of precedence constraints.... for pred in rcpsp_problem.predecessors.get(task_id, {}): if pred in perm_extended: respected = False break if respected: act_ids = [task_id] current_min_time = max([minimum_starting_time[act_id] for act_id in act_ids]) max_duration = max( [ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"] for act_id in act_ids ] ) valid = False while not valid: valid = True for t in range(current_min_time, current_min_time + max_duration): for res in rcpsp_problem.resources_list: r = sum( [ ressource_consumption( res=res, task=task, duration=t - current_min_time, mode=modes_dict[task], ) for task in act_ids ] ) if r == 0: continue if t < new_horizon: if resource_avail_in_time[res][t] < r: valid = False break else: unfeasible_non_renewable_resources = True if not valid: break if not valid: current_min_time += 1 if not unfeasible_non_renewable_resources: end_t = current_min_time + max_duration for t in range(current_min_time, current_min_time + max_duration): for res in resource_avail_in_time: r = sum( [ ressource_consumption( res=res, task=task, duration=t - current_min_time, mode=modes_dict[task], ) for task in act_ids ] ) resource_avail_in_time[res][t] -= r if res in rcpsp_problem.non_renewable_resources and t == end_t - 1: for tt in range(end_t, new_horizon): resource_avail_in_time[res][tt] -= r if resource_avail_in_time[res][tt] < 0: unfeasible_non_renewable_resources = True for act_id in act_ids: activity_end_times[act_id] = ( current_min_time + rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"] ) perm_extended.remove(act_id) for s in rcpsp_problem.successors[act_id]: minimum_starting_time[s] = max( minimum_starting_time[s], activity_end_times[act_id] ) for s in rcpsp_problem.special_constraints.dict_start_at_end.get( act_id, {} ): minimum_starting_time[s] = activity_end_times[act_id] for s in rcpsp_problem.special_constraints.dict_start_after_nunit.get( act_id, {} ): minimum_starting_time[s] = ( current_min_time + rcpsp_problem.special_constraints.dict_start_after_nunit[ act_id ][s] ) for s in rcpsp_problem.special_constraints.dict_start_at_end_offset.get( act_id, {} ): minimum_starting_time[s] = ( activity_end_times[act_id] + rcpsp_problem.special_constraints.dict_start_at_end_offset[ act_id ][s] ) rcpsp_schedule: dict[Hashable, dict[str, int]] = {} for act_id in activity_end_times: rcpsp_schedule[act_id] = {} rcpsp_schedule[act_id]["start_time"] = ( activity_end_times[act_id] - rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"] ) rcpsp_schedule[act_id]["end_time"] = activity_end_times[act_id] for act_id in completed_tasks: rcpsp_schedule[act_id] = {} rcpsp_schedule[act_id]["start_time"] = completed_tasks[act_id].start rcpsp_schedule[act_id]["end_time"] = completed_tasks[act_id].end if unfeasible_non_renewable_resources: rcpsp_schedule_feasible = False last_act_id = rcpsp_problem.sink_task if last_act_id not in rcpsp_schedule: rcpsp_schedule[last_act_id] = {} rcpsp_schedule[last_act_id]["start_time"] = 99999999 rcpsp_schedule[last_act_id]["end_time"] = 9999999 else: rcpsp_schedule_feasible = True return rcpsp_schedule, rcpsp_schedule_feasible
[docs] def compute_mean_resource_reserve( solution: RcpspSolution, rcpsp_problem: RcpspProblem ) -> float: if not solution.rcpsp_schedule_feasible: return 0.0 last_activity = rcpsp_problem.sink_task makespan = solution.rcpsp_schedule[last_activity]["end_time"] resource_avail_in_time = {} modes = rcpsp_problem.build_mode_dict(solution.rcpsp_modes) for res in rcpsp_problem.resources_list: if rcpsp_problem.is_varying_resource(): resource_avail_in_time[res] = rcpsp_problem.resources[res][: makespan + 1] # type: ignore else: resource_avail_in_time[res] = np.full( makespan, rcpsp_problem.resources[res], dtype=np.int_ ).tolist() for act_id in rcpsp_problem.tasks_list: start_time = solution.rcpsp_schedule[act_id]["start_time"] end_time = solution.rcpsp_schedule[act_id]["end_time"] mode = modes[act_id] for t in range(start_time, end_time): for res in resource_avail_in_time: if rcpsp_problem.mode_details[act_id][mode].get(res, 0) == 0: continue resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[act_id][ mode ][res] if res in rcpsp_problem.non_renewable_resources and t == end_time: for tt in range(end_time, makespan): resource_avail_in_time[res][tt] -= rcpsp_problem.mode_details[ act_id ][mode][res] mean_avail = {} for res in resource_avail_in_time: mean_avail[res] = np.mean(resource_avail_in_time[res]) mean_resource_reserve = np.mean( [ mean_avail[res] / rcpsp_problem.get_max_resource_capacity(res) for res in rcpsp_problem.resources_list ] ) return float(mean_resource_reserve)
[docs] class PartialSolution: def __init__( self, task_mode: Optional[dict[int, int]] = None, start_times: Optional[dict[int, int]] = None, end_times: Optional[dict[int, int]] = None, partial_permutation: Optional[list[int]] = None, list_partial_order: Optional[list[list[int]]] = None, start_together: Optional[list[tuple[int, int]]] = None, start_at_end: Optional[list[tuple[int, int]]] = None, start_at_end_plus_offset: Optional[list[tuple[int, int, int]]] = None, start_after_nunit: Optional[list[tuple[int, int, int]]] = None, disjunctive_tasks: Optional[list[tuple[int, int]]] = None, start_times_window: Optional[dict[Hashable, tuple[int, int]]] = None, end_times_window: Optional[dict[Hashable, tuple[int, int]]] = None, ): self.task_mode = task_mode self.start_times = start_times self.end_times = end_times self.partial_permutation = partial_permutation self.list_partial_order = list_partial_order self.start_together = start_together self.start_at_end = start_at_end self.start_after_nunit = start_after_nunit self.start_at_end_plus_offset = start_at_end_plus_offset self.disjunctive_tasks = disjunctive_tasks self.start_times_window = start_times_window self.end_times_window = end_times_window
# one element in self.list_partial_order is a list [l1, l2, l3] # indicating that l1 should be started before l1, and l2 before l3 for example