# Copyright (c) 2023 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import (
annotations, # make annotations be considered as string by default
)
from collections.abc import Hashable, Iterable
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Optional
import numpy as np
from numpy import typing as npt
from discrete_optimization.generic_tasks_tools.cumulative_resource import (
CumulativeResourceSolution,
)
from discrete_optimization.generic_tasks_tools.multimode_scheduling import (
MultimodeSchedulingSolution,
)
from discrete_optimization.generic_tools.do_problem import RobustProblem
if TYPE_CHECKING: # avoid circular imports due to annotations
from discrete_optimization.rcpsp.problem import RcpspProblem
Task = Hashable
Resource = str
[docs]
class TaskDetails:
def __init__(self, start: int, end: int):
self.start = start
self.end = end
[docs]
class RcpspSolution(
CumulativeResourceSolution[Task, Resource], MultimodeSchedulingSolution[Task]
):
"""Solution to RcpspProblem problems.
Attributes:
problem: RCPSP problem for which this is a solution
rcpsp_permutation: Tasks permutation, i.e. list of tasks indices among non-dummy tasks (i.e. excluding source and sink tasks).
See below how it is related to the schedule.
If given and schedule not given, used to reconstruct the schedule.
if not given, deduced from schedule.
If not given and schedule not given, it is set to `problem.fixed_permutation` (useful for alternating genetic algorithms)
rcpsp_schedule: Tasks schedule. ( task -> "start_time" or "end_time" -> time)
If given used to construct `standardised_permutation`.
If given and `rcpsp_permutation` not given, used to construct `rcpsp_permutation`.
If given and `rcpsp_permutation` given, no consistency check.
If not given, deduced from `rcpsp_permutation` if possible.
If not possible, `rcpsp_schedule_feasible` set to False and
`rcpsp_schedule` set to a partially filled schedule.
In case of `Aggreg_RcpspModel`, kept empty.
rcpsp_modes: Mode used for each task. Same order as `problem.tasks_list_non_dummy`.
If not given we use `problem.fixed_modes` if existing, else 1 for each task.
rcpsp_schedule_feasible: True if a schedule can be deduced from permutation.
False if it leads to incoherency preventing a schedule generation.
Recomputed when schedule is (re)computed from permutation.
standardised_permutation: Permutation deduced uniquely from schedule. If given, not recomputed.
Can be different from `rcpsp_permutation` as different permutations can lead to same schedule.
fast: boolean indicating if we use the fast functions to generate schedule from permutation.
## More about conversion permutation <-> schedule:
### Schedule -> permutation
Take the list of tasks indices among non-dummy tasks (i.e. excluding source and sink tasks),
sorted by their starting times.
### Permutation -> schedule: serial-SGS (Schedule Generation Scheme)
- Initialization:
- the source task is set at time 0
- the resource availability is set to the initial calendar given by the problem
- Loop: the schedule is filled task by task, in the order given by the permutation, and taking into account
- the precedence constraints: a task starts after the end of the tasks before it in the precedence graph
- the resource constraints: during the task, the needed resource must be available
When a new task has been inserted, the availability of each resource is updated accordingly, i.e.
we remove the necessary resources for the task (according to the chosen mode set by `rcpsp_modes`) for the task duration,
and after if the resource is non-renewable.
- When hitting an impossibility (e.g. inserting a task after one of its successor, or when the resources are not enough available until the problem horizon)
the permutation is set infeasible (so `rcpsp_schedule_feasible` is set to `False`).
See "Compute a dummy solution for RCPSP" in the tutorial "notebooks/RCPSP tutorials/RCPSP-1 Introduction.ipynb" for more details.
Example:
This example demonstrates how a permutation of non-dummy tasks is
transformed into a schedule, adhering to precedence and resource constraints.
>>> from discrete_optimization.rcpsp.problem import RcpspProblem
>>> from discrete_optimization.rcpsp.solution import RcpspSolution
>>> # Define resources
>>> resources = {"R1": 5, "R2": 2}
>>> non_renewable_resources = [] # Empty if all resources replenish when a task ends
>>> # Define mode details
>>> # Format: { task_id: { mode_id: { "duration": int, "resource_name": amount } } }
>>> mode_details = {
... "source": {1: {"duration": 0, "R1": 0, "R2": 0}}, # Source (Dummy)
... "task-1": {1: {"duration": 4, "R1": 2, "R2": 2}},
... "task-2": {1: {"duration": 3, "R1": 3, "R2": 0}},
... "task-3": {1: {"duration": 5, "R1": 2, "R2": 1}},
... "task-4": {1: {"duration": 2, "R1": 1, "R2": 1}},
... "sink": {1: {"duration": 0, "R1": 0, "R2": 0}}, # Sink (Dummy)
... }
>>> # Define precedence graph
>>> successors = {
... "source": ["task-1", "task-2"], # First tasks: 1 and 2
... "task-1": ["task-3"], # Task 1 must finish before 3
... "task-2": ["task-4"], # Task 2 must finish before 4
... "task-3": ["sink"], # Task 3 leads to Sink
... "task-4": ["sink"], # Task 4 leads to Sink
... "sink": [], # Sink has no successors
... }
>>> # Initialize the Problem
>>> problem = RcpspProblem(
... resources=resources,
... non_renewable_resources=non_renewable_resources,
... mode_details=mode_details,
... successors=successors,
... horizon=20, # Maximum time allowed for the project
... source_task="source",
... sink_task="sink"
... )
>>> # Create solution
>>> # Convert list of task ids into a licit permutation (using indices among non-dummy tasks)
>>> rcpsp_permutation = problem.convert_task_ids_to_permutation(["task-2", "task-1", "task-4", "task-3"])
>>> print(rcpsp_permutation)
[1, 0, 3, 2]
>>> solution = RcpspSolution(
... problem=problem,
... rcpsp_permutation=rcpsp_permutation,
... rcpsp_modes=[1, 1, 1, 1] # same mode for each task
... )
>>> # The Serial SGS calculates the start/end times according:
>>> print(solution.rcpsp_schedule["task-2"]) # Task 2
{'start_time': 0, 'end_time': 3}
>>> print(solution.rcpsp_schedule["task-1"]) # Task 1 follows Task 2 in permutation
{'start_time': 0, 'end_time': 4}
>>> print(solution.rcpsp_schedule["task-4"]) # Task 4 can start after Task 2, but only when ressources are available (thus after task 1)
{'start_time': 4, 'end_time': 6}
>>> print(solution.rcpsp_schedule["task-3"]) # Task 3 can start after Task 1 (and ressources are available)
{'start_time': 4, 'end_time': 9}
"""
problem: RcpspProblem
def __init__(
self,
problem: RcpspProblem,
rcpsp_permutation: Optional[list[int]] = None,
rcpsp_schedule: Optional[dict[Hashable, dict[str, int]]] = None,
rcpsp_modes: Optional[list[int]] = None,
rcpsp_schedule_feasible: bool = True,
standardised_permutation: Optional[list[int]] = None,
fast: bool = True,
):
"""
Args:
problem:
rcpsp_permutation:
rcpsp_schedule:
rcpsp_modes:
rcpsp_schedule_feasible:
standardised_permutation:
fast:
"""
super().__init__(problem=problem)
self.rcpsp_schedule_feasible = rcpsp_schedule_feasible
self.fast = fast
self.rcpsp_modes: list[int]
self.rcpsp_schedule: dict[Hashable, dict[str, int]]
self.rcpsp_permutation: list[int]
self.standardised_permutation: list[int]
# init rcpsp_modes
if rcpsp_modes is None:
if self.problem.fixed_modes is not None:
self.rcpsp_modes = self.problem.fixed_modes
else:
self.rcpsp_modes = [1 for i in range(self.problem.n_jobs_non_dummy)]
else:
self.rcpsp_modes = rcpsp_modes
# init rcpsp_permutation
if rcpsp_permutation is None:
if rcpsp_schedule is None:
if self.problem.fixed_permutation is not None:
self.rcpsp_permutation = self.problem.fixed_permutation
else:
raise ValueError(
"rcpsp_permutation and rcpsp_schedule can be None together "
"only if problem.fixed_permutation is not None"
)
else:
self.rcpsp_schedule = rcpsp_schedule
standardised_permutation = self.generate_permutation_from_schedule()
self.rcpsp_permutation = deepcopy(standardised_permutation)
else:
self.rcpsp_permutation = rcpsp_permutation
# init rcpsp_schedule
if rcpsp_schedule is None:
self.generate_schedule_from_permutation_serial_sgs(do_fast=self.fast)
else:
self.rcpsp_schedule = rcpsp_schedule
# init standardised_permutation
if standardised_permutation is None:
self.standardised_permutation = self.generate_permutation_from_schedule()
else:
self.standardised_permutation = standardised_permutation
# schedule already computed (prevent issues with __setattr__ hack)
self._schedule_to_recompute = False
[docs]
def change_problem(self, new_problem: RcpspProblem) -> None: # type: ignore
super().change_problem(new_problem=new_problem)
# recompute schedule and standardised permutation with respect to the new problem
self.generate_schedule_from_permutation_serial_sgs(do_fast=self.fast)
self.standardised_permutation = self.generate_permutation_from_schedule()
def __setattr__(self, key: str, value: Any) -> None:
super.__setattr__(self, key, value)
if key == "rcpsp_permutation":
self._schedule_to_recompute = True
[docs]
def copy(self) -> "RcpspSolution":
return RcpspSolution(
problem=self.problem,
rcpsp_permutation=deepcopy(self.rcpsp_permutation),
rcpsp_modes=deepcopy(self.rcpsp_modes),
rcpsp_schedule=deepcopy(self.rcpsp_schedule),
rcpsp_schedule_feasible=self.rcpsp_schedule_feasible,
standardised_permutation=self.standardised_permutation,
fast=self.fast,
)
[docs]
def lazy_copy(self) -> "RcpspSolution":
return RcpspSolution(
problem=self.problem,
rcpsp_permutation=self.rcpsp_permutation,
rcpsp_modes=self.rcpsp_modes,
rcpsp_schedule=self.rcpsp_schedule,
rcpsp_schedule_feasible=self.rcpsp_schedule_feasible,
standardised_permutation=self.standardised_permutation,
fast=self.fast,
)
def __str__(self) -> str:
if self.rcpsp_schedule is None:
sched_str = "None"
else:
sched_str = str(self.rcpsp_schedule)
val = "RCPSP solution (rcpsp_schedule): " + sched_str
return val
[docs]
def generate_permutation_from_schedule(self) -> list[int]:
sorted_task = [
self.problem.index_task_non_dummy[i]
for i in sorted(
self.rcpsp_schedule, key=lambda x: self.rcpsp_schedule[x]["start_time"]
)
if i in self.problem.index_task_non_dummy
]
return sorted_task
[docs]
def compute_mean_resource_reserve(self, fast: bool = True) -> float:
if not fast:
return compute_mean_resource_reserve(
solution=self, rcpsp_problem=self.problem
)
else:
if not self.rcpsp_schedule_feasible:
return 0.0
last_activity = self.problem.sink_task
makespan = self.rcpsp_schedule[last_activity]["end_time"]
if max(self.rcpsp_modes) > self.problem.max_number_of_mode:
# non existing modes
return 0.0
else:
return self.problem.compute_mean_resource(
horizon=makespan,
modes_array=np.array(
self.problem.build_mode_array(self.rcpsp_modes)
)
- 1, # permutation_task=array(task)->task index
start_array=np.array(
[
self.rcpsp_schedule[t]["start_time"]
for t in self.problem.tasks_list
]
),
end_array=np.array(
[
self.rcpsp_schedule[t]["end_time"]
for t in self.problem.tasks_list
]
),
)
[docs]
def generate_schedule_from_permutation_serial_sgs(
self, do_fast: bool = True
) -> None:
self._schedule_to_recompute = False
if isinstance(self.problem, RobustProblem):
self.rcpsp_schedule = {}
self.rcpsp_schedule_feasible = True
else:
if do_fast:
schedule: dict[int, tuple[int, int]]
if max(self.rcpsp_modes) > self.problem.max_number_of_mode:
# non existing modes
schedule, unfeasible = {}, True
else:
schedule, unfeasible = self.problem.func_sgs(
permutation_task=permutation_do_to_permutation_sgs_fast(
self.problem, self.rcpsp_permutation
),
modes_array=np.array(
self.problem.build_mode_array(self.rcpsp_modes)
)
- 1,
)
self.rcpsp_schedule_feasible = not unfeasible
self.rcpsp_schedule = {}
for k in schedule:
self.rcpsp_schedule[self.problem.tasks_list[k]] = {
"start_time": schedule[k][0],
"end_time": schedule[k][1],
}
if self.problem.sink_task not in self.rcpsp_schedule:
self.rcpsp_schedule[self.problem.sink_task] = {
"start_time": 99999999,
"end_time": 99999999,
}
else:
(
self.rcpsp_schedule,
self.rcpsp_schedule_feasible,
) = generate_schedule_from_permutation_serial_sgs(
solution=self, rcpsp_problem=self.problem
)
[docs]
def generate_schedule_from_permutation_serial_sgs_2(
self,
current_t: int = 0,
completed_tasks: Optional[dict[Hashable, TaskDetails]] = None,
scheduled_tasks_start_times: Optional[dict[Hashable, int]] = None,
do_fast: bool = True,
) -> None:
if completed_tasks is None:
completed_tasks = {}
if scheduled_tasks_start_times is None:
scheduled_tasks_start_times = {}
if do_fast and not self.problem.do_special_constraints:
schedule: dict[int, tuple[int, int]]
if max(self.rcpsp_modes) > self.problem.max_number_of_mode:
# non existing modes
schedule, unfeasible = {}, True
else:
schedule, unfeasible = self.problem.func_sgs_2(
current_time=current_t,
completed_task_indicator=np.array(
[
1 if self.problem.tasks_list[i] in completed_tasks else 0
for i in range(self.problem.n_jobs)
]
),
completed_task_times=np.array(
[
completed_tasks[self.problem.tasks_list[i]].end
if self.problem.tasks_list[i] in completed_tasks
else 0
for i in range(self.problem.n_jobs)
]
),
scheduled_task=np.array(
[
scheduled_tasks_start_times[self.problem.tasks_list[i]]
if self.problem.tasks_list[i] in scheduled_tasks_start_times
else -1
for i in range(self.problem.n_jobs)
]
),
permutation_task=permutation_do_to_permutation_sgs_fast(
self.problem, self.rcpsp_permutation
),
modes_array=np.array(
self.problem.build_mode_array(self.rcpsp_modes)
)
- 1,
)
self.rcpsp_schedule = {}
for k in schedule:
self.rcpsp_schedule[self.problem.tasks_list[k]] = {
"start_time": schedule[k][0],
"end_time": schedule[k][1],
}
if self.problem.sink_task not in self.rcpsp_schedule:
self.rcpsp_schedule[self.problem.sink_task] = {
"start_time": 999999999,
"end_time": 999999999,
}
self.rcpsp_schedule_feasible = not unfeasible
self._schedule_to_recompute = False
else:
if self.problem.do_special_constraints:
(
self.rcpsp_schedule,
self.rcpsp_schedule_feasible,
) = generate_schedule_from_permutation_serial_sgs_partial_schedule_specialized_constraints(
solution=self,
current_t=current_t,
completed_tasks=completed_tasks,
scheduled_tasks_start_times=scheduled_tasks_start_times,
rcpsp_problem=self.problem,
)
else:
(
self.rcpsp_schedule,
self.rcpsp_schedule_feasible,
) = generate_schedule_from_permutation_serial_sgs_partial_schedule(
solution=self,
current_t=current_t,
completed_tasks=completed_tasks,
scheduled_tasks_start_times=scheduled_tasks_start_times,
rcpsp_problem=self.problem,
)
self._schedule_to_recompute = False
[docs]
def get_max_end_time(self) -> int:
return self.rcpsp_schedule[self.problem.sink_task]["end_time"]
[docs]
def get_start_time(self, task: Hashable) -> int:
return self.rcpsp_schedule[task]["start_time"]
[docs]
def get_end_time(self, task: Hashable) -> int:
return self.rcpsp_schedule[task]["end_time"]
[docs]
def get_start_times_list(self, task: Hashable) -> list[int]:
return [self.get_start_time(task)]
[docs]
def get_end_times_list(self, task: Hashable) -> list[int]:
return [self.get_end_time(task)]
[docs]
def get_active_time(self, task: Hashable) -> list[int]:
return list(range(self.get_start_time(task), self.get_end_time(task)))
[docs]
def get_mode(self, task: Hashable) -> int:
if task in (self.problem.source_task, self.problem.sink_task):
# should have only 1 mode (no particular meaning)
return next(iter(self.problem.mode_details[task]))
return self.rcpsp_modes[self.problem.index_task_non_dummy[task]]
def __hash__(self) -> int:
return hash((tuple(self.rcpsp_permutation), tuple(self.rcpsp_modes)))
def __eq__(self, other: object) -> bool:
return (
isinstance(other, RcpspSolution)
and self.rcpsp_permutation == other.rcpsp_permutation
and self.rcpsp_modes == other.rcpsp_modes
)
[docs]
def permutation_do_to_permutation_sgs_fast(
rcpsp_problem: RcpspProblem, permutation_do: Iterable[int]
) -> npt.NDArray[np.int_]:
perm_extended = [
rcpsp_problem.index_task[rcpsp_problem.tasks_list_non_dummy[x]]
for x in permutation_do
]
perm_extended.insert(0, rcpsp_problem.index_task[rcpsp_problem.source_task])
perm_extended.append(rcpsp_problem.index_task[rcpsp_problem.sink_task])
return np.array(perm_extended, dtype=np.int_)
[docs]
def compute_est_with_time_lags(
rcpsp_problem: RcpspProblem,
scheduled_tasks: dict[Hashable, int],
unscheduled_tasks: set[Hashable],
modes_dict: dict[Hashable, int],
) -> dict[Hashable, int]:
"""
Compute Earliest Start Times for unscheduled tasks given partial schedule.
Uses constraint propagation to handle:
- Successor constraints
- Minimum time lags (start_to_start_min_time_lag)
- Maximum time lags (start_to_start_max_time_lag)
- Time windows (start_times_window)
Args:
rcpsp_problem: RCPSP problem instance
scheduled_tasks: Dict mapping scheduled tasks to their start times
unscheduled_tasks: Set of tasks not yet scheduled
modes_dict: Dict mapping tasks to their execution modes
Returns:
Dict mapping unscheduled tasks to their earliest start times
"""
est = {}
# Initialize EST for unscheduled tasks
for task in unscheduled_tasks:
est[task] = 0
# Apply time windows
if (
rcpsp_problem.do_special_constraints
and task in rcpsp_problem.special_constraints.start_times_window
):
window = rcpsp_problem.special_constraints.start_times_window[task]
if window[0] is not None:
est[task] = window[0]
# Propagate constraints from scheduled tasks
for task in scheduled_tasks:
start_time = scheduled_tasks[task]
end_time = (
start_time + rcpsp_problem.mode_details[task][modes_dict[task]]["duration"]
)
# Standard successors: j must start after i ends
for succ in rcpsp_problem.successors.get(task, []):
if succ in unscheduled_tasks:
est[succ] = max(est[succ], end_time)
# Minimum time lags: start(i) + d_min <= start(j)
if rcpsp_problem.do_special_constraints:
for (
succ,
d_min,
) in rcpsp_problem.special_constraints.dict_start_to_start_min_time_lag.get(
task, {}
).items():
if succ in unscheduled_tasks:
est[succ] = max(est[succ], start_time + d_min)
# Propagate constraints between unscheduled tasks (fixed-point iteration)
changed = True
max_iterations = 100 # Prevent infinite loops in case of cycles
iteration = 0
while changed and iteration < max_iterations:
changed = False
iteration += 1
for task_i in unscheduled_tasks:
old_est = est[task_i]
# Successors within unscheduled tasks
for pred in rcpsp_problem.predecessors.get(task_i, {}):
if pred in unscheduled_tasks:
pred_duration = rcpsp_problem.mode_details[pred][modes_dict[pred]][
"duration"
]
est[task_i] = max(est[task_i], est[pred] + pred_duration)
# Minimum time lags within unscheduled tasks
if rcpsp_problem.do_special_constraints:
for (
pred,
d_min,
) in rcpsp_problem.special_constraints.dict_start_to_start_min_time_lag_reverse.get(
task_i, {}
).items():
if pred in unscheduled_tasks:
est[task_i] = max(est[task_i], est[pred] + d_min)
if est[task_i] != old_est:
changed = True
return est
[docs]
def compute_lst_with_time_lags(
rcpsp_problem: RcpspProblem,
scheduled_tasks: dict[Hashable, int],
unscheduled_tasks: set[Hashable],
modes_dict: dict[Hashable, int],
horizon: int,
) -> dict[Hashable, int]:
"""
Compute Latest Start Times for unscheduled tasks.
Uses backward constraint propagation for maximum time lags.
Args:
rcpsp_problem: RCPSP problem instance
scheduled_tasks: Dict mapping scheduled tasks to their start times
unscheduled_tasks: Set of tasks not yet scheduled
modes_dict: Dict mapping tasks to their execution modes
horizon: Planning horizon (upper bound on start times)
Returns:
Dict mapping unscheduled tasks to their latest start times
"""
lst = {}
# Initialize LST for unscheduled tasks
for task in unscheduled_tasks:
lst[task] = horizon
# Apply time windows
if (
rcpsp_problem.do_special_constraints
and task in rcpsp_problem.special_constraints.start_times_window
):
window = rcpsp_problem.special_constraints.start_times_window[task]
if window[1] is not None:
lst[task] = min(lst[task], window[1])
# Propagate constraints from scheduled tasks
for task in scheduled_tasks:
start_time = scheduled_tasks[task]
# Maximum time lags: start(j) <= start(i) + offset where offset >= 0
if rcpsp_problem.do_special_constraints:
for (
succ,
offset,
) in rcpsp_problem.special_constraints.dict_start_to_start_max_time_lag.get(
task, {}
).items():
if succ in unscheduled_tasks:
lst[succ] = min(lst[succ], start_time + offset)
# Reverse minimum time lags: if start(i) + d_min <= start(j) is scheduled,
# then start(i) <= start(j) - d_min, so LST[i] = min(LST[i], start(j) - d_min)
for (
pred,
d_min,
) in rcpsp_problem.special_constraints.dict_start_to_start_min_time_lag_reverse.get(
task, {}
).items():
if pred in unscheduled_tasks:
lst[pred] = min(lst[pred], start_time - d_min)
# Propagate constraints between unscheduled tasks (fixed-point iteration)
changed = True
max_iterations = 100 # Prevent infinite loops
iteration = 0
while changed and iteration < max_iterations:
changed = False
iteration += 1
for task_j in unscheduled_tasks:
old_lst = lst[task_j]
# Maximum time lags within unscheduled tasks
if rcpsp_problem.do_special_constraints:
for (
pred,
offset,
) in rcpsp_problem.special_constraints.dict_start_to_start_max_time_lag_reverse.get(
task_j, {}
).items():
if pred in unscheduled_tasks:
lst[pred] = min(lst[pred], lst[task_j] - offset)
# Reverse of minimum time lags
for (
succ,
d_min,
) in rcpsp_problem.special_constraints.dict_start_to_start_min_time_lag.get(
task_j, {}
).items():
if succ in unscheduled_tasks:
lst[task_j] = min(lst[task_j], lst[succ] - d_min)
if lst[task_j] != old_lst:
changed = True
return lst
[docs]
def is_task_schedulable(
task: Hashable,
start_time: int,
rcpsp_problem: RcpspProblem,
scheduled_tasks: dict[Hashable, int],
unscheduled_tasks: set[Hashable],
modes_dict: dict[Hashable, int],
est: dict[Hashable, int],
lst: dict[Hashable, int],
resource_avail: dict[str, list[int]],
) -> bool:
"""
Check if scheduling task at start_time is feasible (unblocking filter).
Returns True if:
1. Time window is valid: EST[task] <= start_time <= LST[task]
2. Resources are available
3. Scheduling won't make other tasks infeasible
Args:
task: Task to check
start_time: Proposed start time for the task
rcpsp_problem: RCPSP problem instance
scheduled_tasks: Currently scheduled tasks
unscheduled_tasks: Remaining unscheduled tasks
modes_dict: Task execution modes
est: Earliest start times for unscheduled tasks
lst: Latest start times for unscheduled tasks
resource_avail: Current resource availability by time
Returns:
True if task can be scheduled at start_time, False otherwise
"""
duration = rcpsp_problem.mode_details[task][modes_dict[task]]["duration"]
# Check 1: Time window
if not (est[task] <= start_time <= lst[task]):
return False
# Check 2: Resource availability
for t in range(start_time, start_time + duration):
if t >= len(resource_avail[list(resource_avail.keys())[0]]):
return False
for res in rcpsp_problem.resources_list:
demand = rcpsp_problem.mode_details[task][modes_dict[task]].get(res, 0)
if demand > 0 and resource_avail[res][t] < demand:
return False
# Check 3: Unblocking filter - won't make other tasks infeasible
# Temporarily schedule the task and recompute EST/LST
temp_scheduled = scheduled_tasks.copy()
temp_scheduled[task] = start_time
temp_unscheduled = unscheduled_tasks - {task}
if len(temp_unscheduled) == 0:
return True # No remaining tasks to check
new_est = compute_est_with_time_lags(
rcpsp_problem, temp_scheduled, temp_unscheduled, modes_dict
)
new_lst = compute_lst_with_time_lags(
rcpsp_problem,
temp_scheduled,
temp_unscheduled,
modes_dict,
rcpsp_problem.horizon,
)
# Check if all remaining tasks have feasible time windows
for other_task in temp_unscheduled:
other_duration = rcpsp_problem.mode_details[other_task][modes_dict[other_task]][
"duration"
]
if new_est[other_task] + other_duration > new_lst[other_task]:
return False # This task would become infeasible
return True
[docs]
def generate_schedule_from_permutation_iterative_sgs_unblocking(
solution: RcpspSolution, rcpsp_problem: RcpspProblem
) -> tuple[dict[Hashable, dict[str, int]], bool]:
"""
Iterative Serial Generation Scheme with Unblocking Filters for RCPSP/max.
Handles generalized precedence constraints (min/max time lags).
Uses EST/LST propagation and unblocking filters to determine schedulability.
Reference:
- Bartusch et al. (1988), Scheduling project networks with resource constraints
- Neumann et al. (2003), Project scheduling with time windows
Args:
solution: RCPSP solution with permutation and modes
rcpsp_problem: RCPSP problem instance
Returns:
Tuple of (schedule dict, feasibility bool)
"""
# Initialize
scheduled_tasks: dict[Hashable, int] = {}
modes_dict = rcpsp_problem.build_mode_dict(solution.rcpsp_modes)
# Resource availability tracking
resource_avail_in_time = {}
for res in rcpsp_problem.resources_list:
if rcpsp_problem.is_varying_resource():
resource_avail_in_time[res] = list(
rcpsp_problem.resources[res][: rcpsp_problem.horizon + 1] # type: ignore
)
else:
resource_avail_in_time[res] = [
rcpsp_problem.resources[res] for _ in range(rcpsp_problem.horizon)
]
# Build permutation with dummy tasks
perm_extended = [
rcpsp_problem.tasks_list_non_dummy[x] for x in solution.rcpsp_permutation
]
perm_extended.insert(0, rcpsp_problem.source_task)
perm_extended.append(rcpsp_problem.sink_task)
unscheduled_tasks = set(perm_extended)
unfeasible = False
# Iterative scheduling loop
iteration = 0
max_iterations = len(perm_extended) * 10 # Prevent infinite loops
while unscheduled_tasks and not unfeasible and iteration < max_iterations:
iteration += 1
# Compute EST/LST for all unscheduled tasks
est = compute_est_with_time_lags(
rcpsp_problem, scheduled_tasks, unscheduled_tasks, modes_dict
)
lst = compute_lst_with_time_lags(
rcpsp_problem,
scheduled_tasks,
unscheduled_tasks,
modes_dict,
rcpsp_problem.horizon,
)
# Check for infeasibility
for task in unscheduled_tasks:
duration = rcpsp_problem.mode_details[task][modes_dict[task]]["duration"]
if est[task] + duration > lst[task]:
unfeasible = True
break
if unfeasible:
break
# Try to schedule tasks following permutation order
scheduled_this_iteration = False
for task in perm_extended:
if task not in unscheduled_tasks:
continue
# Find earliest feasible start time for this task
duration = rcpsp_problem.mode_details[task][modes_dict[task]]["duration"]
for start_time_candidate in range(
est[task], min(lst[task] + 1, rcpsp_problem.horizon)
):
if is_task_schedulable(
task,
start_time_candidate,
rcpsp_problem,
scheduled_tasks,
unscheduled_tasks,
modes_dict,
est,
lst,
resource_avail_in_time,
):
# Schedule the task
scheduled_tasks[task] = start_time_candidate
unscheduled_tasks.remove(task)
# Update resource availability
for t in range(
start_time_candidate, start_time_candidate + duration
):
if t < len(
resource_avail_in_time[
list(resource_avail_in_time.keys())[0]
]
):
for res in rcpsp_problem.resources_list:
demand = rcpsp_problem.mode_details[task][
modes_dict[task]
].get(res, 0)
resource_avail_in_time[res][t] -= demand
scheduled_this_iteration = True
break # Move to next task in permutation
if scheduled_this_iteration:
break # Restart from beginning of permutation
if not scheduled_this_iteration and unscheduled_tasks:
# No task could be scheduled in this iteration
unfeasible = True
# Build schedule output
rcpsp_schedule: dict[Hashable, dict[str, int]] = {}
for task, start_time in scheduled_tasks.items():
duration = rcpsp_problem.mode_details[task][modes_dict[task]]["duration"]
rcpsp_schedule[task] = {
"start_time": start_time,
"end_time": start_time + duration,
}
if unfeasible or unscheduled_tasks:
rcpsp_schedule_feasible = False
# Add dummy end task if missing
if rcpsp_problem.sink_task not in rcpsp_schedule:
rcpsp_schedule[rcpsp_problem.sink_task] = {
"start_time": 99999999,
"end_time": 99999999,
}
else:
rcpsp_schedule_feasible = True
return rcpsp_schedule, rcpsp_schedule_feasible
[docs]
def generate_schedule_from_permutation_serial_sgs(
solution: RcpspSolution, rcpsp_problem: RcpspProblem
) -> tuple[dict[Hashable, dict[str, int]], bool]:
activity_end_times = {}
unfeasible_non_renewable_resources = False
new_horizon = rcpsp_problem.horizon
resource_avail_in_time = {}
for res in rcpsp_problem.resources_list:
if rcpsp_problem.is_varying_resource():
resource_avail_in_time[res] = rcpsp_problem.resources[res][ # type: ignore
: new_horizon + 1
]
else:
resource_avail_in_time[res] = np.full(
new_horizon, rcpsp_problem.resources[res], dtype=np.int_
).tolist()
minimum_starting_time = {}
for act in rcpsp_problem.tasks_list:
minimum_starting_time[act] = 0
perm_extended = [
rcpsp_problem.tasks_list_non_dummy[x] for x in solution.rcpsp_permutation
]
perm_extended.insert(0, rcpsp_problem.source_task)
perm_extended.append(rcpsp_problem.sink_task)
modes_dict = rcpsp_problem.build_mode_dict(solution.rcpsp_modes)
for k in modes_dict:
if modes_dict[k] not in rcpsp_problem.mode_details[k]:
modes_dict[k] = 1
while len(perm_extended) > 0 and not unfeasible_non_renewable_resources:
for id_successor in perm_extended:
respected = True
for pred in rcpsp_problem.successors:
if (
id_successor in rcpsp_problem.successors[pred]
and pred in perm_extended
):
respected = False
break
if respected:
act_id = id_successor
break
# for act_id in perm_extended:
current_min_time = minimum_starting_time[act_id]
valid = False
while not valid:
valid = True
for t in range(
current_min_time,
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"],
):
for res in rcpsp_problem.resources_list:
if (
rcpsp_problem.mode_details[act_id][modes_dict[act_id]].get(
res, 0
)
== 0
):
continue
if t < new_horizon:
if (
resource_avail_in_time[res][t]
< rcpsp_problem.mode_details[act_id][modes_dict[act_id]][
res
]
):
valid = False
else:
unfeasible_non_renewable_resources = True
if not valid:
current_min_time += 1
if not unfeasible_non_renewable_resources:
end_t = (
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
for t in range(current_min_time, end_t):
for res in resource_avail_in_time:
if (
rcpsp_problem.mode_details[act_id][modes_dict[act_id]].get(
res, 0
)
== 0
):
continue
resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[
act_id
][modes_dict[act_id]][res]
if res in rcpsp_problem.non_renewable_resources and t == end_t - 1:
for tt in range(end_t, new_horizon):
resource_avail_in_time[res][tt] -= (
rcpsp_problem.mode_details[act_id][modes_dict[act_id]][
res
]
)
if resource_avail_in_time[res][tt] < 0:
unfeasible_non_renewable_resources = True
activity_end_times[act_id] = end_t
perm_extended.remove(act_id)
for s in rcpsp_problem.successors[act_id]:
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
rcpsp_schedule: dict[Hashable, dict[str, int]] = {}
for act_id in activity_end_times:
rcpsp_schedule[act_id] = {}
rcpsp_schedule[act_id]["start_time"] = (
activity_end_times[act_id]
- rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
rcpsp_schedule[act_id]["end_time"] = activity_end_times[act_id]
if unfeasible_non_renewable_resources:
rcpsp_schedule_feasible = False
last_act_id = rcpsp_problem.sink_task
if last_act_id not in rcpsp_schedule:
rcpsp_schedule[last_act_id] = {}
rcpsp_schedule[last_act_id]["start_time"] = 99999999
rcpsp_schedule[last_act_id]["end_time"] = 9999999
else:
rcpsp_schedule_feasible = True
return rcpsp_schedule, rcpsp_schedule_feasible
[docs]
def generate_schedule_from_permutation_serial_sgs_special_constraints(
solution: RcpspSolution, rcpsp_problem: RcpspProblem
) -> tuple[dict[Hashable, dict[str, int]], bool]:
activity_end_times = {}
unfeasible_non_renewable_resources = False
new_horizon = rcpsp_problem.horizon
resource_avail_in_time = {}
for res in rcpsp_problem.resources_list:
if rcpsp_problem.is_varying_resource():
resource_avail_in_time[res] = rcpsp_problem.resources[res][ # type: ignore
: new_horizon + 1
]
else:
resource_avail_in_time[res] = np.full(
new_horizon, rcpsp_problem.resources[res], dtype=np.int_
).tolist()
minimum_starting_time = {}
for act in rcpsp_problem.tasks_list:
minimum_starting_time[act] = 0
if rcpsp_problem.do_special_constraints:
if act in rcpsp_problem.special_constraints.start_times_window:
minimum_starting_time[act] = (
rcpsp_problem.special_constraints.start_times_window[act][0] # type: ignore
if rcpsp_problem.special_constraints.start_times_window[act][0]
is not None
else 0
)
perm_extended = [
rcpsp_problem.tasks_list_non_dummy[x] for x in solution.rcpsp_permutation
]
perm_extended.insert(0, rcpsp_problem.source_task)
perm_extended.append(rcpsp_problem.sink_task)
modes_dict = rcpsp_problem.build_mode_dict(solution.rcpsp_modes)
def ressource_consumption(
res: str, task: Hashable, duration: int, mode: int
) -> int:
dur = rcpsp_problem.mode_details[task][mode]["duration"]
if duration > dur:
return 0
return rcpsp_problem.mode_details[task][mode].get(res, 0)
for k in modes_dict:
if modes_dict[k] not in rcpsp_problem.mode_details[k]:
modes_dict[k] = 1
def look_for_task(perm: list[Hashable], ignore_sc: bool = False) -> list[Hashable]:
act_ids = []
for task_id in perm:
respected = True
# Check all kind of precedence constraints....
for pred in rcpsp_problem.predecessors.get(task_id, {}):
if pred in perm_extended:
respected = False
break
if not ignore_sc:
for (
pred
) in rcpsp_problem.special_constraints.dict_start_at_end_reverse.get(
task_id, {}
):
if pred in perm_extended:
respected = False
break
for pred in rcpsp_problem.special_constraints.dict_start_at_end_offset_reverse.get(
task_id, {}
):
if pred in perm_extended:
respected = False
break
# Check start_to_start_min_time_lag constraints
# Only block if offset >= 0 (precedence-like behavior)
for pred in rcpsp_problem.special_constraints.dict_start_to_start_min_time_lag_reverse.get(
task_id, {}
):
offset = rcpsp_problem.special_constraints.dict_start_to_start_min_time_lag_reverse[
task_id
][pred]
if offset >= 0 and pred in perm_extended:
respected = False
break
task_to_start_too = set()
if respected:
task_to_start_too = (
rcpsp_problem.special_constraints.dict_start_together.get(
task_id, set()
)
)
if not ignore_sc:
if len(task_to_start_too) > 0:
if not all(
s not in perm_extended
for t in task_to_start_too
for s in rcpsp_problem.predecessors[t]
):
respected = False
if respected:
act_ids = [task_id] + list(task_to_start_too)
break
return act_ids
while len(perm_extended) > 0 and not unfeasible_non_renewable_resources:
act_ids = look_for_task(
[
k
for k in rcpsp_problem.special_constraints.dict_start_at_end_reverse
if k in perm_extended
]
)
act_ids = []
if len(act_ids) == 0:
act_ids = look_for_task(perm_extended)
if len(act_ids) == 0:
act_ids = look_for_task(perm_extended, ignore_sc=True)
current_min_time = max([minimum_starting_time[act_id] for act_id in act_ids])
max_duration = max(
[
rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
for act_id in act_ids
]
)
valid = False
while not valid:
valid = True
for t in range(current_min_time, current_min_time + max_duration):
for res in rcpsp_problem.resources_list:
r = sum(
[
ressource_consumption(
res=res,
task=task,
duration=t - current_min_time,
mode=modes_dict[task],
)
for task in act_ids
]
)
if r == 0:
continue
if t < new_horizon:
if resource_avail_in_time[res][t] < r:
valid = False
break
else:
unfeasible_non_renewable_resources = True
if not valid:
break
if not valid:
current_min_time += 1
if not unfeasible_non_renewable_resources:
end_t = current_min_time + max_duration
for t in range(current_min_time, current_min_time + max_duration):
for res in resource_avail_in_time:
r = sum(
[
ressource_consumption(
res=res,
task=task,
duration=t - current_min_time,
mode=modes_dict[task],
)
for task in act_ids
]
)
resource_avail_in_time[res][t] -= r
if res in rcpsp_problem.non_renewable_resources and t == end_t - 1:
for tt in range(end_t, new_horizon):
resource_avail_in_time[res][tt] -= r
if resource_avail_in_time[res][tt] < 0:
unfeasible_non_renewable_resources = True
for act_id in act_ids:
activity_end_times[act_id] = (
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
perm_extended.remove(act_id)
for s in rcpsp_problem.successors[act_id]:
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
for s in rcpsp_problem.special_constraints.dict_start_at_end.get(
act_id, {}
):
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
for s in rcpsp_problem.special_constraints.dict_start_to_start_min_time_lag.get(
act_id, {}
):
minimum_starting_time[s] = max(
minimum_starting_time[s],
current_min_time
+ rcpsp_problem.special_constraints.dict_start_to_start_min_time_lag[
act_id
][s],
)
for s in rcpsp_problem.special_constraints.dict_start_at_end_offset.get(
act_id, {}
):
minimum_starting_time[s] = max(
minimum_starting_time[s],
activity_end_times[act_id]
+ rcpsp_problem.special_constraints.dict_start_at_end_offset[
act_id
][s],
)
rcpsp_schedule: dict[Hashable, dict[str, int]] = {}
for act_id in activity_end_times:
rcpsp_schedule[act_id] = {}
rcpsp_schedule[act_id]["start_time"] = (
activity_end_times[act_id]
- rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
rcpsp_schedule[act_id]["end_time"] = activity_end_times[act_id]
if unfeasible_non_renewable_resources:
rcpsp_schedule_feasible = False
last_act_id = rcpsp_problem.sink_task
if last_act_id not in rcpsp_schedule:
rcpsp_schedule[last_act_id] = {}
rcpsp_schedule[last_act_id]["start_time"] = 99999999
rcpsp_schedule[last_act_id]["end_time"] = 9999999
else:
rcpsp_schedule_feasible = True
return rcpsp_schedule, rcpsp_schedule_feasible
[docs]
def generate_schedule_from_permutation_serial_sgs_partial_schedule(
solution: RcpspSolution,
rcpsp_problem: RcpspProblem,
current_t: int,
completed_tasks: dict[Hashable, TaskDetails],
scheduled_tasks_start_times: dict[Hashable, int],
) -> tuple[dict[Hashable, dict[str, int]], bool]:
activity_end_times = {}
unfeasible_non_renewable_resources = False
new_horizon = rcpsp_problem.horizon
resource_avail_in_time = {}
for res in rcpsp_problem.resources_list:
if rcpsp_problem.is_varying_resource():
resource_avail_in_time[res] = rcpsp_problem.resources[res][ # type: ignore
: new_horizon + 1
]
else:
resource_avail_in_time[res] = np.full(
new_horizon, rcpsp_problem.resources[res], dtype=np.int_
).tolist()
minimum_starting_time = {}
for act in rcpsp_problem.tasks_list:
if act in list(scheduled_tasks_start_times.keys()):
minimum_starting_time[act] = scheduled_tasks_start_times[act]
else:
minimum_starting_time[act] = current_t
perm_extended = [
rcpsp_problem.tasks_list_non_dummy[x] for x in solution.rcpsp_permutation
]
perm_extended.insert(0, rcpsp_problem.source_task)
perm_extended.append(rcpsp_problem.sink_task)
modes_dict = rcpsp_problem.build_mode_dict(solution.rcpsp_modes)
# Update current resource usage by the scheduled task (ongoing task, in practice)
for act_id in scheduled_tasks_start_times:
current_min_time = scheduled_tasks_start_times[act_id]
end_t = (
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
for t in range(current_min_time, end_t):
for res in resource_avail_in_time:
resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[act_id][
modes_dict[act_id]
].get(res, 0)
if res in rcpsp_problem.non_renewable_resources and t == end_t - 1:
for tt in range(end_t, new_horizon):
resource_avail_in_time[res][tt] -= rcpsp_problem.mode_details[
act_id
][modes_dict[act_id]].get(res, 0)
if resource_avail_in_time[res][tt] < 0:
unfeasible_non_renewable_resources = True
activity_end_times[act_id] = end_t
perm_extended.remove(act_id)
for s in rcpsp_problem.successors[act_id]:
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
perm_extended = [x for x in perm_extended if x not in list(completed_tasks)]
# fix modes in case specified mode not in mode details for the activites
for ac in modes_dict:
if modes_dict[ac] not in rcpsp_problem.mode_details[ac]:
modes_dict[ac] = 1
while len(perm_extended) > 0 and not unfeasible_non_renewable_resources:
# get first activity in perm with precedences respected
for id_successor in perm_extended:
respected = True
for pred in rcpsp_problem.successors.keys():
if (
id_successor in rcpsp_problem.successors[pred]
and pred in perm_extended
):
respected = False
break
if respected:
act_id = id_successor
break
current_min_time = minimum_starting_time[act_id]
valid = False
while not valid:
valid = True
for t in range(
current_min_time,
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"],
):
for res in resource_avail_in_time:
if t < new_horizon:
if resource_avail_in_time[res][t] < rcpsp_problem.mode_details[
act_id
][modes_dict[act_id]].get(res, 0):
valid = False
else:
unfeasible_non_renewable_resources = True
if not valid:
current_min_time += 1
if not unfeasible_non_renewable_resources:
end_t = (
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
for t in range(current_min_time, end_t):
for res in resource_avail_in_time:
resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[
act_id
][modes_dict[act_id]].get(res, 0)
if res in rcpsp_problem.non_renewable_resources and t == end_t - 1:
for tt in range(end_t + 1, new_horizon):
resource_avail_in_time[res][tt] -= (
rcpsp_problem.mode_details[act_id][
modes_dict[act_id]
].get(res, 0)
)
if resource_avail_in_time[res][tt] < 0:
unfeasible_non_renewable_resources = True
activity_end_times[act_id] = end_t
perm_extended.remove(act_id)
for s in rcpsp_problem.successors[act_id]:
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
rcpsp_schedule: dict[Hashable, dict[str, int]] = {}
for act_id in activity_end_times:
rcpsp_schedule[act_id] = {}
rcpsp_schedule[act_id]["start_time"] = (
activity_end_times[act_id]
- rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
rcpsp_schedule[act_id]["end_time"] = activity_end_times[act_id]
for act_id in completed_tasks:
rcpsp_schedule[act_id] = {}
rcpsp_schedule[act_id]["start_time"] = completed_tasks[act_id].start
rcpsp_schedule[act_id]["end_time"] = completed_tasks[act_id].end
if unfeasible_non_renewable_resources:
rcpsp_schedule_feasible = False
last_act_id = rcpsp_problem.sink_task
if last_act_id not in rcpsp_schedule:
rcpsp_schedule[last_act_id] = {}
rcpsp_schedule[last_act_id]["start_time"] = 99999999
rcpsp_schedule[last_act_id]["end_time"] = 9999999
else:
rcpsp_schedule_feasible = True
return rcpsp_schedule, rcpsp_schedule_feasible
[docs]
def generate_schedule_from_permutation_serial_sgs_partial_schedule_specialized_constraints(
solution: RcpspSolution,
rcpsp_problem: RcpspProblem,
current_t: int,
completed_tasks: dict[Hashable, TaskDetails],
scheduled_tasks_start_times: dict[Hashable, int],
) -> tuple[dict[Hashable, dict[str, int]], bool]:
activity_end_times = {}
unfeasible_non_renewable_resources = False
new_horizon = rcpsp_problem.horizon
resource_avail_in_time = {}
for res in rcpsp_problem.resources_list:
if rcpsp_problem.is_varying_resource():
resource_avail_in_time[res] = rcpsp_problem.resources[res][ # type: ignore
: new_horizon + 1
]
else:
resource_avail_in_time[res] = np.full(
new_horizon, rcpsp_problem.resources[res], dtype=np.int_
).tolist()
def ressource_consumption(
res: str, task: Hashable, duration: int, mode: int
) -> int:
dur = rcpsp_problem.mode_details[task][mode]["duration"]
if duration > dur:
return 0
return rcpsp_problem.mode_details[task][mode].get(res, 0)
minimum_starting_time = {}
for act in rcpsp_problem.tasks_list:
if act in list(scheduled_tasks_start_times.keys()):
minimum_starting_time[act] = scheduled_tasks_start_times[act]
else:
minimum_starting_time[act] = current_t
if rcpsp_problem.do_special_constraints:
if act in rcpsp_problem.special_constraints.start_times_window:
minimum_starting_time[act] = (
max( # type: ignore
rcpsp_problem.special_constraints.start_times_window[act][0],
minimum_starting_time[act],
)
if rcpsp_problem.special_constraints.start_times_window[act][0]
is not None
else minimum_starting_time[act]
)
perm_extended = [
rcpsp_problem.tasks_list_non_dummy[x] for x in solution.rcpsp_permutation
]
perm_extended.insert(0, rcpsp_problem.source_task)
perm_extended.append(rcpsp_problem.sink_task)
modes_dict = rcpsp_problem.build_mode_dict(solution.rcpsp_modes)
# Update current resource usage by the scheduled task (ongoing task, in practice)
for act_id in scheduled_tasks_start_times:
current_min_time = scheduled_tasks_start_times[act_id]
end_t = (
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
for t in range(current_min_time, end_t):
for res in resource_avail_in_time:
resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[act_id][
modes_dict[act_id]
].get(res, 0)
if res in rcpsp_problem.non_renewable_resources and t == end_t - 1:
for tt in range(end_t, new_horizon):
resource_avail_in_time[res][tt] -= rcpsp_problem.mode_details[
act_id
][modes_dict[act_id]].get(res, 0)
if resource_avail_in_time[res][tt] < 0:
unfeasible_non_renewable_resources = True
activity_end_times[act_id] = end_t
perm_extended.remove(act_id)
for s in rcpsp_problem.successors[act_id]:
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
perm_extended = [x for x in perm_extended if x not in list(completed_tasks)]
for ac in modes_dict:
if modes_dict[ac] not in rcpsp_problem.mode_details[ac]:
modes_dict[ac] = 1
while len(perm_extended) > 0 and not unfeasible_non_renewable_resources:
act_ids = []
for task_id in perm_extended:
respected = True
# Check all kind of precedence constraints....
for pred in rcpsp_problem.predecessors.get(task_id, {}):
if pred in perm_extended:
respected = False
break
for pred in rcpsp_problem.special_constraints.dict_start_at_end_reverse.get(
task_id, {}
):
if pred in perm_extended:
respected = False
break
for (
pred
) in rcpsp_problem.special_constraints.dict_start_at_end_offset_reverse.get(
task_id, {}
):
if pred in perm_extended:
respected = False
break
# Check start_to_start_min_time_lag constraints
for pred in rcpsp_problem.special_constraints.dict_start_to_start_min_time_lag_reverse.get(
task_id, {}
):
offset = rcpsp_problem.special_constraints.dict_start_to_start_min_time_lag_reverse[
task_id
][pred]
if offset >= 0 and pred in perm_extended:
respected = False
break
task_to_start_too: list[Hashable] = []
if respected:
task_to_start_too = [
k
for k in rcpsp_problem.special_constraints.dict_start_together.get(
task_id, set()
)
if k in perm_extended
]
if len(task_to_start_too) > 0:
if not all(
s not in perm_extended
for t in task_to_start_too
for s in rcpsp_problem.predecessors[t]
):
respected = False
if respected:
act_ids = [task_id] + task_to_start_too
break
if len(act_ids) == 0:
for task_id in perm_extended:
respected = True
# Check all kind of precedence constraints....
for pred in rcpsp_problem.predecessors.get(task_id, {}):
if pred in perm_extended:
respected = False
break
if respected:
act_ids = [task_id]
current_min_time = max([minimum_starting_time[act_id] for act_id in act_ids])
max_duration = max(
[
rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
for act_id in act_ids
]
)
valid = False
while not valid:
valid = True
for t in range(current_min_time, current_min_time + max_duration):
for res in rcpsp_problem.resources_list:
r = sum(
[
ressource_consumption(
res=res,
task=task,
duration=t - current_min_time,
mode=modes_dict[task],
)
for task in act_ids
]
)
if r == 0:
continue
if t < new_horizon:
if resource_avail_in_time[res][t] < r:
valid = False
break
else:
unfeasible_non_renewable_resources = True
if not valid:
break
if not valid:
current_min_time += 1
if not unfeasible_non_renewable_resources:
end_t = current_min_time + max_duration
for t in range(current_min_time, current_min_time + max_duration):
for res in resource_avail_in_time:
r = sum(
[
ressource_consumption(
res=res,
task=task,
duration=t - current_min_time,
mode=modes_dict[task],
)
for task in act_ids
]
)
resource_avail_in_time[res][t] -= r
if res in rcpsp_problem.non_renewable_resources and t == end_t - 1:
for tt in range(end_t, new_horizon):
resource_avail_in_time[res][tt] -= r
if resource_avail_in_time[res][tt] < 0:
unfeasible_non_renewable_resources = True
for act_id in act_ids:
activity_end_times[act_id] = (
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
perm_extended.remove(act_id)
for s in rcpsp_problem.successors[act_id]:
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
for s in rcpsp_problem.special_constraints.dict_start_at_end.get(
act_id, {}
):
minimum_starting_time[s] = activity_end_times[act_id]
for s in rcpsp_problem.special_constraints.dict_start_to_start_min_time_lag.get(
act_id, {}
):
minimum_starting_time[s] = (
current_min_time
+ rcpsp_problem.special_constraints.dict_start_to_start_min_time_lag[
act_id
][s]
)
for s in rcpsp_problem.special_constraints.dict_start_at_end_offset.get(
act_id, {}
):
minimum_starting_time[s] = (
activity_end_times[act_id]
+ rcpsp_problem.special_constraints.dict_start_at_end_offset[
act_id
][s]
)
rcpsp_schedule: dict[Hashable, dict[str, int]] = {}
for act_id in activity_end_times:
rcpsp_schedule[act_id] = {}
rcpsp_schedule[act_id]["start_time"] = (
activity_end_times[act_id]
- rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
rcpsp_schedule[act_id]["end_time"] = activity_end_times[act_id]
for act_id in completed_tasks:
rcpsp_schedule[act_id] = {}
rcpsp_schedule[act_id]["start_time"] = completed_tasks[act_id].start
rcpsp_schedule[act_id]["end_time"] = completed_tasks[act_id].end
if unfeasible_non_renewable_resources:
rcpsp_schedule_feasible = False
last_act_id = rcpsp_problem.sink_task
if last_act_id not in rcpsp_schedule:
rcpsp_schedule[last_act_id] = {}
rcpsp_schedule[last_act_id]["start_time"] = 99999999
rcpsp_schedule[last_act_id]["end_time"] = 9999999
else:
rcpsp_schedule_feasible = True
return rcpsp_schedule, rcpsp_schedule_feasible
[docs]
def compute_mean_resource_reserve(
solution: RcpspSolution, rcpsp_problem: RcpspProblem
) -> float:
if not solution.rcpsp_schedule_feasible:
return 0.0
last_activity = rcpsp_problem.sink_task
makespan = solution.rcpsp_schedule[last_activity]["end_time"]
resource_avail_in_time = {}
modes = rcpsp_problem.build_mode_dict(solution.rcpsp_modes)
for res in rcpsp_problem.resources_list:
if rcpsp_problem.is_varying_resource():
resource_avail_in_time[res] = rcpsp_problem.resources[res][: makespan + 1] # type: ignore
else:
resource_avail_in_time[res] = np.full(
makespan, rcpsp_problem.resources[res], dtype=np.int_
).tolist()
for act_id in rcpsp_problem.tasks_list:
start_time = solution.rcpsp_schedule[act_id]["start_time"]
end_time = solution.rcpsp_schedule[act_id]["end_time"]
mode = modes[act_id]
for t in range(start_time, end_time):
for res in resource_avail_in_time:
if rcpsp_problem.mode_details[act_id][mode].get(res, 0) == 0:
continue
resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[act_id][
mode
][res]
if res in rcpsp_problem.non_renewable_resources and t == end_time:
for tt in range(end_time, makespan):
resource_avail_in_time[res][tt] -= rcpsp_problem.mode_details[
act_id
][mode][res]
mean_avail = {}
for res in resource_avail_in_time:
mean_avail[res] = np.mean(resource_avail_in_time[res])
mean_resource_reserve = np.mean(
[
mean_avail[res] / rcpsp_problem.get_max_resource_capacity(res)
for res in rcpsp_problem.resources_list
]
)
return float(mean_resource_reserve)
[docs]
class PartialSolution:
def __init__(
self,
task_mode: Optional[dict[int, int]] = None,
start_times: Optional[dict[int, int]] = None,
end_times: Optional[dict[int, int]] = None,
partial_permutation: Optional[list[int]] = None,
list_partial_order: Optional[list[list[int]]] = None,
start_together: Optional[list[tuple[int, int]]] = None,
start_at_end: Optional[list[tuple[int, int]]] = None,
start_at_end_plus_offset: Optional[list[tuple[int, int, int]]] = None,
start_to_start_min_time_lag: Optional[list[tuple[int, int, int]]] = None,
start_to_start_max_time_lag: Optional[list[tuple[int, int, int]]] = None,
disjunctive_tasks: Optional[list[tuple[int, int]]] = None,
start_times_window: Optional[dict[Hashable, tuple[int, int]]] = None,
end_times_window: Optional[dict[Hashable, tuple[int, int]]] = None,
):
self.task_mode = task_mode
self.start_times = start_times
self.end_times = end_times
self.partial_permutation = partial_permutation
self.list_partial_order = list_partial_order
self.start_together = start_together
self.start_at_end = start_at_end
self.start_at_end_plus_offset = start_at_end_plus_offset
self.start_to_start_min_time_lag = start_to_start_min_time_lag
self.start_to_start_max_time_lag = start_to_start_max_time_lag
self.disjunctive_tasks = disjunctive_tasks
self.start_times_window = start_times_window
self.end_times_window = end_times_window
# one element in self.list_partial_order is a list [l1, l2, l3]
# indicating that l1 should be started before l1, and l2 before l3 for example