# Copyright (c) 2023 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import (
annotations, # make annotations be considered as string by default
)
from collections.abc import Hashable, Iterable
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Optional
import numpy as np
from numpy import typing as npt
from discrete_optimization.generic_tasks_tools.multimode import MultimodeSolution
from discrete_optimization.generic_tasks_tools.scheduling import SchedulingSolution
from discrete_optimization.generic_tools.do_problem import RobustProblem
if TYPE_CHECKING: # avoid circular imports due to annotations
from discrete_optimization.rcpsp.problem import RcpspProblem
Task = Hashable
[docs]
class TaskDetails:
def __init__(self, start: int, end: int):
self.start = start
self.end = end
[docs]
class RcpspSolution(SchedulingSolution[Task], MultimodeSolution[Task]):
"""Solution to RcpspProblem problems.
Attributes:
problem: RCPSP problem for which this is a solution
rcpsp_permutation: Tasks permutation, i.e. list of tasks indices among non-dummy tasks (i.e. excluding source and sink tasks).
See below how it is related to the schedule.
If given and schedule not given, used to reconstruct the schedule.
if not given, deduced from schedule.
If not given and schedule not given, it is set to `problem.fixed_permutation` (useful for alternating genetic algorithms)
rcpsp_schedule: Tasks schedule. ( task -> "start_time" or "end_time" -> time)
If given used to construct `standardised_permutation`.
If given and `rcpsp_permutation` not given, used to construct `rcpsp_permutation`.
If given and `rcpsp_permutation` given, no consistency check.
If not given, deduced from `rcpsp_permutation` if possible.
If not possible, `rcpsp_schedule_feasible` set to False and
`rcpsp_schedule` set to a partially filled schedule.
In case of `Aggreg_RcpspModel`, kept empty.
rcpsp_modes: Mode used for each task. Same order as `problem.tasks_list_non_dummy`.
If not given we use `problem.fixed_modes` if existing, else 1 for each task.
rcpsp_schedule_feasible: True if a schedule can be deduced from permutation.
False if it leads to incoherency preventing a schedule generation.
Recomputed when schedule is (re)computed from permutation.
standardised_permutation: Permutation deduced uniquely from schedule. If given, not recomputed.
Can be different from `rcpsp_permutation` as different permutations can lead to same schedule.
fast: boolean indicating if we use the fast functions to generate schedule from permutation.
## More about conversion permutation <-> schedule:
### Schedule -> permutation
Take the list of tasks indices among non-dummy tasks (i.e. excluding source and sink tasks),
sorted by their starting times.
### Permutation -> schedule: serial-SGS (Schedule Generation Scheme)
- Initialization:
- the source task is set at time 0
- the resource availability is set to the initial calendar given by the problem
- Loop: the schedule is filled task by task, in the order given by the permutation, and taking into account
- the precedence constraints: a task starts after the end of the tasks before it in the precedence graph
- the resource constraints: during the task, the needed resource must be available
When a new task has been inserted, the availability of each resource is updated accordingly, i.e.
we remove the necessary resources for the task (according to the chosen mode set by `rcpsp_modes`) for the task duration,
and after if the resource is non-renewable.
- When hitting an impossibility (e.g. inserting a task after one of its successor, or when the resources are not enough available until the problem horizon)
the permutation is set infeasible (so `rcpsp_schedule_feasible` is set to `False`).
See "Compute a dummy solution for RCPSP" in the tutorial "notebooks/RCPSP tutorials/RCPSP-1 Introduction.ipynb" for more details.
Example:
This example demonstrates how a permutation of non-dummy tasks is
transformed into a schedule, adhering to precedence and resource constraints.
>>> from discrete_optimization.rcpsp.problem import RcpspProblem
>>> from discrete_optimization.rcpsp.solution import RcpspSolution
>>> # Define resources
>>> resources = {"R1": 5, "R2": 2}
>>> non_renewable_resources = [] # Empty if all resources replenish when a task ends
>>> # Define mode details
>>> # Format: { task_id: { mode_id: { "duration": int, "resource_name": amount } } }
>>> mode_details = {
... "source": {1: {"duration": 0, "R1": 0, "R2": 0}}, # Source (Dummy)
... "task-1": {1: {"duration": 4, "R1": 2, "R2": 2}},
... "task-2": {1: {"duration": 3, "R1": 3, "R2": 0}},
... "task-3": {1: {"duration": 5, "R1": 2, "R2": 1}},
... "task-4": {1: {"duration": 2, "R1": 1, "R2": 1}},
... "sink": {1: {"duration": 0, "R1": 0, "R2": 0}}, # Sink (Dummy)
... }
>>> # Define precedence graph
>>> successors = {
... "source": ["task-1", "task-2"], # First tasks: 1 and 2
... "task-1": ["task-3"], # Task 1 must finish before 3
... "task-2": ["task-4"], # Task 2 must finish before 4
... "task-3": ["sink"], # Task 3 leads to Sink
... "task-4": ["sink"], # Task 4 leads to Sink
... "sink": [], # Sink has no successors
... }
>>> # Initialize the Problem
>>> problem = RcpspProblem(
... resources=resources,
... non_renewable_resources=non_renewable_resources,
... mode_details=mode_details,
... successors=successors,
... horizon=20, # Maximum time allowed for the project
... source_task="source",
... sink_task="sink"
... )
>>> # Create solution
>>> # Convert list of task ids into a licit permutation (using indices among non-dummy tasks)
>>> rcpsp_permutation = problem.convert_task_ids_to_permutation(["task-2", "task-1", "task-4", "task-3"])
>>> print(rcpsp_permutation)
[1, 0, 3, 2]
>>> solution = RcpspSolution(
... problem=problem,
... rcpsp_permutation=rcpsp_permutation,
... rcpsp_modes=[1, 1, 1, 1] # same mode for each task
... )
>>> # The Serial SGS calculates the start/end times according:
>>> print(solution.rcpsp_schedule["task-2"]) # Task 2
{'start_time': 0, 'end_time': 3}
>>> print(solution.rcpsp_schedule["task-1"]) # Task 1 follows Task 2 in permutation
{'start_time': 0, 'end_time': 4}
>>> print(solution.rcpsp_schedule["task-4"]) # Task 4 can start after Task 2, but only when ressources are available (thus after task 1)
{'start_time': 4, 'end_time': 6}
>>> print(solution.rcpsp_schedule["task-3"]) # Task 3 can start after Task 1 (and ressources are available)
{'start_time': 4, 'end_time': 9}
"""
problem: RcpspProblem
def __init__(
self,
problem: RcpspProblem,
rcpsp_permutation: Optional[list[int]] = None,
rcpsp_schedule: Optional[dict[Hashable, dict[str, int]]] = None,
rcpsp_modes: Optional[list[int]] = None,
rcpsp_schedule_feasible: bool = True,
standardised_permutation: Optional[list[int]] = None,
fast: bool = True,
):
"""
Args:
problem:
rcpsp_permutation:
rcpsp_schedule:
rcpsp_modes:
rcpsp_schedule_feasible:
standardised_permutation:
fast:
"""
super().__init__(problem=problem)
self.rcpsp_schedule_feasible = rcpsp_schedule_feasible
self.fast = fast
self.rcpsp_modes: list[int]
self.rcpsp_schedule: dict[Hashable, dict[str, int]]
self.rcpsp_permutation: list[int]
self.standardised_permutation: list[int]
# init rcpsp_modes
if rcpsp_modes is None:
if self.problem.fixed_modes is not None:
self.rcpsp_modes = self.problem.fixed_modes
else:
self.rcpsp_modes = [1 for i in range(self.problem.n_jobs_non_dummy)]
else:
self.rcpsp_modes = rcpsp_modes
# init rcpsp_permutation
if rcpsp_permutation is None:
if rcpsp_schedule is None:
if self.problem.fixed_permutation is not None:
self.rcpsp_permutation = self.problem.fixed_permutation
else:
raise ValueError(
"rcpsp_permutation and rcpsp_schedule can be None together "
"only if problem.fixed_permutation is not None"
)
else:
self.rcpsp_schedule = rcpsp_schedule
standardised_permutation = self.generate_permutation_from_schedule()
self.rcpsp_permutation = deepcopy(standardised_permutation)
else:
self.rcpsp_permutation = rcpsp_permutation
# init rcpsp_schedule
if rcpsp_schedule is None:
self.generate_schedule_from_permutation_serial_sgs(do_fast=self.fast)
else:
self.rcpsp_schedule = rcpsp_schedule
# init standardised_permutation
if standardised_permutation is None:
self.standardised_permutation = self.generate_permutation_from_schedule()
else:
self.standardised_permutation = standardised_permutation
# schedule already computed (prevent issues with __setattr__ hack)
self._schedule_to_recompute = False
[docs]
def change_problem(self, new_problem: RcpspProblem) -> None: # type: ignore
super().change_problem(new_problem=new_problem)
# recompute schedule and standardised permutation with respect to the new problem
self.generate_schedule_from_permutation_serial_sgs(do_fast=self.fast)
self.standardised_permutation = self.generate_permutation_from_schedule()
def __setattr__(self, key: str, value: Any) -> None:
super.__setattr__(self, key, value)
if key == "rcpsp_permutation":
self._schedule_to_recompute = True
[docs]
def copy(self) -> "RcpspSolution":
return RcpspSolution(
problem=self.problem,
rcpsp_permutation=deepcopy(self.rcpsp_permutation),
rcpsp_modes=deepcopy(self.rcpsp_modes),
rcpsp_schedule=deepcopy(self.rcpsp_schedule),
rcpsp_schedule_feasible=self.rcpsp_schedule_feasible,
standardised_permutation=self.standardised_permutation,
fast=self.fast,
)
[docs]
def lazy_copy(self) -> "RcpspSolution":
return RcpspSolution(
problem=self.problem,
rcpsp_permutation=self.rcpsp_permutation,
rcpsp_modes=self.rcpsp_modes,
rcpsp_schedule=self.rcpsp_schedule,
rcpsp_schedule_feasible=self.rcpsp_schedule_feasible,
standardised_permutation=self.standardised_permutation,
fast=self.fast,
)
def __str__(self) -> str:
if self.rcpsp_schedule is None:
sched_str = "None"
else:
sched_str = str(self.rcpsp_schedule)
val = "RCPSP solution (rcpsp_schedule): " + sched_str
return val
[docs]
def generate_permutation_from_schedule(self) -> list[int]:
sorted_task = [
self.problem.index_task_non_dummy[i]
for i in sorted(
self.rcpsp_schedule, key=lambda x: self.rcpsp_schedule[x]["start_time"]
)
if i in self.problem.index_task_non_dummy
]
return sorted_task
[docs]
def compute_mean_resource_reserve(self, fast: bool = True) -> float:
if not fast:
return compute_mean_resource_reserve(
solution=self, rcpsp_problem=self.problem
)
else:
if not self.rcpsp_schedule_feasible:
return 0.0
last_activity = self.problem.sink_task
makespan = self.rcpsp_schedule[last_activity]["end_time"]
if max(self.rcpsp_modes) > self.problem.max_number_of_mode:
# non existing modes
return 0.0
else:
return self.problem.compute_mean_resource(
horizon=makespan,
modes_array=np.array(
self.problem.build_mode_array(self.rcpsp_modes)
)
- 1, # permutation_task=array(task)->task index
start_array=np.array(
[
self.rcpsp_schedule[t]["start_time"]
for t in self.problem.tasks_list
]
),
end_array=np.array(
[
self.rcpsp_schedule[t]["end_time"]
for t in self.problem.tasks_list
]
),
)
[docs]
def generate_schedule_from_permutation_serial_sgs(
self, do_fast: bool = True
) -> None:
self._schedule_to_recompute = False
if isinstance(self.problem, RobustProblem):
self.rcpsp_schedule = {}
self.rcpsp_schedule_feasible = True
else:
if do_fast:
schedule: dict[int, tuple[int, int]]
if max(self.rcpsp_modes) > self.problem.max_number_of_mode:
# non existing modes
schedule, unfeasible = {}, True
else:
schedule, unfeasible = self.problem.func_sgs(
permutation_task=permutation_do_to_permutation_sgs_fast(
self.problem, self.rcpsp_permutation
),
modes_array=np.array(
self.problem.build_mode_array(self.rcpsp_modes)
)
- 1,
)
self.rcpsp_schedule_feasible = not unfeasible
self.rcpsp_schedule = {}
for k in schedule:
self.rcpsp_schedule[self.problem.tasks_list[k]] = {
"start_time": schedule[k][0],
"end_time": schedule[k][1],
}
if self.problem.sink_task not in self.rcpsp_schedule:
self.rcpsp_schedule[self.problem.sink_task] = {
"start_time": 99999999,
"end_time": 99999999,
}
else:
(
self.rcpsp_schedule,
self.rcpsp_schedule_feasible,
) = generate_schedule_from_permutation_serial_sgs(
solution=self, rcpsp_problem=self.problem
)
[docs]
def generate_schedule_from_permutation_serial_sgs_2(
self,
current_t: int = 0,
completed_tasks: Optional[dict[Hashable, TaskDetails]] = None,
scheduled_tasks_start_times: Optional[dict[Hashable, int]] = None,
do_fast: bool = True,
) -> None:
if completed_tasks is None:
completed_tasks = {}
if scheduled_tasks_start_times is None:
scheduled_tasks_start_times = {}
if do_fast and not self.problem.do_special_constraints:
schedule: dict[int, tuple[int, int]]
if max(self.rcpsp_modes) > self.problem.max_number_of_mode:
# non existing modes
schedule, unfeasible = {}, True
else:
schedule, unfeasible = self.problem.func_sgs_2(
current_time=current_t,
completed_task_indicator=np.array(
[
1 if self.problem.tasks_list[i] in completed_tasks else 0
for i in range(self.problem.n_jobs)
]
),
completed_task_times=np.array(
[
completed_tasks[self.problem.tasks_list[i]].end
if self.problem.tasks_list[i] in completed_tasks
else 0
for i in range(self.problem.n_jobs)
]
),
scheduled_task=np.array(
[
scheduled_tasks_start_times[self.problem.tasks_list[i]]
if self.problem.tasks_list[i] in scheduled_tasks_start_times
else -1
for i in range(self.problem.n_jobs)
]
),
permutation_task=permutation_do_to_permutation_sgs_fast(
self.problem, self.rcpsp_permutation
),
modes_array=np.array(
self.problem.build_mode_array(self.rcpsp_modes)
)
- 1,
)
self.rcpsp_schedule = {}
for k in schedule:
self.rcpsp_schedule[self.problem.tasks_list[k]] = {
"start_time": schedule[k][0],
"end_time": schedule[k][1],
}
if self.problem.sink_task not in self.rcpsp_schedule:
self.rcpsp_schedule[self.problem.sink_task] = {
"start_time": 999999999,
"end_time": 999999999,
}
self.rcpsp_schedule_feasible = not unfeasible
self._schedule_to_recompute = False
else:
if self.problem.do_special_constraints:
(
self.rcpsp_schedule,
self.rcpsp_schedule_feasible,
) = generate_schedule_from_permutation_serial_sgs_partial_schedule_specialized_constraints(
solution=self,
current_t=current_t,
completed_tasks=completed_tasks,
scheduled_tasks_start_times=scheduled_tasks_start_times,
rcpsp_problem=self.problem,
)
else:
(
self.rcpsp_schedule,
self.rcpsp_schedule_feasible,
) = generate_schedule_from_permutation_serial_sgs_partial_schedule(
solution=self,
current_t=current_t,
completed_tasks=completed_tasks,
scheduled_tasks_start_times=scheduled_tasks_start_times,
rcpsp_problem=self.problem,
)
self._schedule_to_recompute = False
[docs]
def get_max_end_time(self) -> int:
return self.rcpsp_schedule[self.problem.sink_task]["end_time"]
[docs]
def get_start_time(self, task: Hashable) -> int:
return self.rcpsp_schedule[task]["start_time"]
[docs]
def get_end_time(self, task: Hashable) -> int:
return self.rcpsp_schedule[task]["end_time"]
[docs]
def get_start_times_list(self, task: Hashable) -> list[int]:
return [self.get_start_time(task)]
[docs]
def get_end_times_list(self, task: Hashable) -> list[int]:
return [self.get_end_time(task)]
[docs]
def get_active_time(self, task: Hashable) -> list[int]:
return list(range(self.get_start_time(task), self.get_end_time(task)))
[docs]
def get_mode(self, task: Hashable) -> int:
if task in (self.problem.source_task, self.problem.sink_task):
# should have only 1 mode (no particular meaning)
return next(iter(self.problem.mode_details[task]))
return self.rcpsp_modes[self.problem.index_task_non_dummy[task]]
def __hash__(self) -> int:
return hash((tuple(self.rcpsp_permutation), tuple(self.rcpsp_modes)))
def __eq__(self, other: object) -> bool:
return (
isinstance(other, RcpspSolution)
and self.rcpsp_permutation == other.rcpsp_permutation
and self.rcpsp_modes == other.rcpsp_modes
)
[docs]
def permutation_do_to_permutation_sgs_fast(
rcpsp_problem: RcpspProblem, permutation_do: Iterable[int]
) -> npt.NDArray[np.int_]:
perm_extended = [
rcpsp_problem.index_task[rcpsp_problem.tasks_list_non_dummy[x]]
for x in permutation_do
]
perm_extended.insert(0, rcpsp_problem.index_task[rcpsp_problem.source_task])
perm_extended.append(rcpsp_problem.index_task[rcpsp_problem.sink_task])
return np.array(perm_extended, dtype=np.int_)
[docs]
def generate_schedule_from_permutation_serial_sgs(
solution: RcpspSolution, rcpsp_problem: RcpspProblem
) -> tuple[dict[Hashable, dict[str, int]], bool]:
activity_end_times = {}
unfeasible_non_renewable_resources = False
new_horizon = rcpsp_problem.horizon
resource_avail_in_time = {}
for res in rcpsp_problem.resources_list:
if rcpsp_problem.is_varying_resource():
resource_avail_in_time[res] = rcpsp_problem.resources[res][ # type: ignore
: new_horizon + 1
]
else:
resource_avail_in_time[res] = np.full(
new_horizon, rcpsp_problem.resources[res], dtype=np.int_
).tolist()
minimum_starting_time = {}
for act in rcpsp_problem.tasks_list:
minimum_starting_time[act] = 0
perm_extended = [
rcpsp_problem.tasks_list_non_dummy[x] for x in solution.rcpsp_permutation
]
perm_extended.insert(0, rcpsp_problem.source_task)
perm_extended.append(rcpsp_problem.sink_task)
modes_dict = rcpsp_problem.build_mode_dict(solution.rcpsp_modes)
for k in modes_dict:
if modes_dict[k] not in rcpsp_problem.mode_details[k]:
modes_dict[k] = 1
while len(perm_extended) > 0 and not unfeasible_non_renewable_resources:
for id_successor in perm_extended:
respected = True
for pred in rcpsp_problem.successors:
if (
id_successor in rcpsp_problem.successors[pred]
and pred in perm_extended
):
respected = False
break
if respected:
act_id = id_successor
break
# for act_id in perm_extended:
current_min_time = minimum_starting_time[act_id]
valid = False
while not valid:
valid = True
for t in range(
current_min_time,
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"],
):
for res in rcpsp_problem.resources_list:
if (
rcpsp_problem.mode_details[act_id][modes_dict[act_id]].get(
res, 0
)
== 0
):
continue
if t < new_horizon:
if (
resource_avail_in_time[res][t]
< rcpsp_problem.mode_details[act_id][modes_dict[act_id]][
res
]
):
valid = False
else:
unfeasible_non_renewable_resources = True
if not valid:
current_min_time += 1
if not unfeasible_non_renewable_resources:
end_t = (
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
for t in range(current_min_time, end_t):
for res in resource_avail_in_time:
if (
rcpsp_problem.mode_details[act_id][modes_dict[act_id]].get(
res, 0
)
== 0
):
continue
resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[
act_id
][modes_dict[act_id]][res]
if res in rcpsp_problem.non_renewable_resources and t == end_t - 1:
for tt in range(end_t, new_horizon):
resource_avail_in_time[res][tt] -= (
rcpsp_problem.mode_details[act_id][modes_dict[act_id]][
res
]
)
if resource_avail_in_time[res][tt] < 0:
unfeasible_non_renewable_resources = True
activity_end_times[act_id] = end_t
perm_extended.remove(act_id)
for s in rcpsp_problem.successors[act_id]:
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
rcpsp_schedule: dict[Hashable, dict[str, int]] = {}
for act_id in activity_end_times:
rcpsp_schedule[act_id] = {}
rcpsp_schedule[act_id]["start_time"] = (
activity_end_times[act_id]
- rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
rcpsp_schedule[act_id]["end_time"] = activity_end_times[act_id]
if unfeasible_non_renewable_resources:
rcpsp_schedule_feasible = False
last_act_id = rcpsp_problem.sink_task
if last_act_id not in rcpsp_schedule:
rcpsp_schedule[last_act_id] = {}
rcpsp_schedule[last_act_id]["start_time"] = 99999999
rcpsp_schedule[last_act_id]["end_time"] = 9999999
else:
rcpsp_schedule_feasible = True
return rcpsp_schedule, rcpsp_schedule_feasible
[docs]
def generate_schedule_from_permutation_serial_sgs_special_constraints(
solution: RcpspSolution, rcpsp_problem: RcpspProblem
) -> tuple[dict[Hashable, dict[str, int]], bool]:
activity_end_times = {}
unfeasible_non_renewable_resources = False
new_horizon = rcpsp_problem.horizon
resource_avail_in_time = {}
for res in rcpsp_problem.resources_list:
if rcpsp_problem.is_varying_resource():
resource_avail_in_time[res] = rcpsp_problem.resources[res][ # type: ignore
: new_horizon + 1
]
else:
resource_avail_in_time[res] = np.full(
new_horizon, rcpsp_problem.resources[res], dtype=np.int_
).tolist()
minimum_starting_time = {}
for act in rcpsp_problem.tasks_list:
minimum_starting_time[act] = 0
if rcpsp_problem.do_special_constraints:
if act in rcpsp_problem.special_constraints.start_times_window:
minimum_starting_time[act] = (
rcpsp_problem.special_constraints.start_times_window[act][0] # type: ignore
if rcpsp_problem.special_constraints.start_times_window[act][0]
is not None
else 0
)
perm_extended = [
rcpsp_problem.tasks_list_non_dummy[x] for x in solution.rcpsp_permutation
]
perm_extended.insert(0, rcpsp_problem.source_task)
perm_extended.append(rcpsp_problem.sink_task)
modes_dict = rcpsp_problem.build_mode_dict(solution.rcpsp_modes)
def ressource_consumption(
res: str, task: Hashable, duration: int, mode: int
) -> int:
dur = rcpsp_problem.mode_details[task][mode]["duration"]
if duration > dur:
return 0
return rcpsp_problem.mode_details[task][mode].get(res, 0)
for k in modes_dict:
if modes_dict[k] not in rcpsp_problem.mode_details[k]:
modes_dict[k] = 1
def look_for_task(perm: list[Hashable], ignore_sc: bool = False) -> list[Hashable]:
act_ids = []
for task_id in perm:
respected = True
# Check all kind of precedence constraints....
for pred in rcpsp_problem.predecessors.get(task_id, {}):
if pred in perm_extended:
respected = False
break
if not ignore_sc:
for (
pred
) in rcpsp_problem.special_constraints.dict_start_at_end_reverse.get(
task_id, {}
):
if pred in perm_extended:
respected = False
break
for pred in rcpsp_problem.special_constraints.dict_start_at_end_offset_reverse.get(
task_id, {}
):
if pred in perm_extended:
respected = False
break
for pred in rcpsp_problem.special_constraints.dict_start_after_nunit_reverse.get(
task_id, {}
):
if pred in perm_extended:
respected = False
break
task_to_start_too = set()
if respected:
task_to_start_too = (
rcpsp_problem.special_constraints.dict_start_together.get(
task_id, set()
)
)
if not ignore_sc:
if len(task_to_start_too) > 0:
if not all(
s not in perm_extended
for t in task_to_start_too
for s in rcpsp_problem.predecessors[t]
):
respected = False
if respected:
act_ids = [task_id] + list(task_to_start_too)
break
return act_ids
while len(perm_extended) > 0 and not unfeasible_non_renewable_resources:
act_ids = look_for_task(
[
k
for k in rcpsp_problem.special_constraints.dict_start_at_end_reverse
if k in perm_extended
]
)
act_ids = []
if len(act_ids) == 0:
act_ids = look_for_task(perm_extended)
if len(act_ids) == 0:
act_ids = look_for_task(perm_extended, ignore_sc=True)
current_min_time = max([minimum_starting_time[act_id] for act_id in act_ids])
max_duration = max(
[
rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
for act_id in act_ids
]
)
valid = False
while not valid:
valid = True
for t in range(current_min_time, current_min_time + max_duration):
for res in rcpsp_problem.resources_list:
r = sum(
[
ressource_consumption(
res=res,
task=task,
duration=t - current_min_time,
mode=modes_dict[task],
)
for task in act_ids
]
)
if r == 0:
continue
if t < new_horizon:
if resource_avail_in_time[res][t] < r:
valid = False
break
else:
unfeasible_non_renewable_resources = True
if not valid:
break
if not valid:
current_min_time += 1
if not unfeasible_non_renewable_resources:
end_t = current_min_time + max_duration
for t in range(current_min_time, current_min_time + max_duration):
for res in resource_avail_in_time:
r = sum(
[
ressource_consumption(
res=res,
task=task,
duration=t - current_min_time,
mode=modes_dict[task],
)
for task in act_ids
]
)
resource_avail_in_time[res][t] -= r
if res in rcpsp_problem.non_renewable_resources and t == end_t - 1:
for tt in range(end_t, new_horizon):
resource_avail_in_time[res][tt] -= r
if resource_avail_in_time[res][tt] < 0:
unfeasible_non_renewable_resources = True
for act_id in act_ids:
activity_end_times[act_id] = (
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
perm_extended.remove(act_id)
for s in rcpsp_problem.successors[act_id]:
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
for s in rcpsp_problem.special_constraints.dict_start_at_end.get(
act_id, {}
):
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
for s in rcpsp_problem.special_constraints.dict_start_after_nunit.get(
act_id, {}
):
minimum_starting_time[s] = max(
minimum_starting_time[s],
current_min_time
+ rcpsp_problem.special_constraints.dict_start_after_nunit[
act_id
][s],
)
for s in rcpsp_problem.special_constraints.dict_start_at_end_offset.get(
act_id, {}
):
minimum_starting_time[s] = max(
minimum_starting_time[s],
activity_end_times[act_id]
+ rcpsp_problem.special_constraints.dict_start_at_end_offset[
act_id
][s],
)
rcpsp_schedule: dict[Hashable, dict[str, int]] = {}
for act_id in activity_end_times:
rcpsp_schedule[act_id] = {}
rcpsp_schedule[act_id]["start_time"] = (
activity_end_times[act_id]
- rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
rcpsp_schedule[act_id]["end_time"] = activity_end_times[act_id]
if unfeasible_non_renewable_resources:
rcpsp_schedule_feasible = False
last_act_id = rcpsp_problem.sink_task
if last_act_id not in rcpsp_schedule:
rcpsp_schedule[last_act_id] = {}
rcpsp_schedule[last_act_id]["start_time"] = 99999999
rcpsp_schedule[last_act_id]["end_time"] = 9999999
else:
rcpsp_schedule_feasible = True
return rcpsp_schedule, rcpsp_schedule_feasible
[docs]
def generate_schedule_from_permutation_serial_sgs_partial_schedule(
solution: RcpspSolution,
rcpsp_problem: RcpspProblem,
current_t: int,
completed_tasks: dict[Hashable, TaskDetails],
scheduled_tasks_start_times: dict[Hashable, int],
) -> tuple[dict[Hashable, dict[str, int]], bool]:
activity_end_times = {}
unfeasible_non_renewable_resources = False
new_horizon = rcpsp_problem.horizon
resource_avail_in_time = {}
for res in rcpsp_problem.resources_list:
if rcpsp_problem.is_varying_resource():
resource_avail_in_time[res] = rcpsp_problem.resources[res][ # type: ignore
: new_horizon + 1
]
else:
resource_avail_in_time[res] = np.full(
new_horizon, rcpsp_problem.resources[res], dtype=np.int_
).tolist()
minimum_starting_time = {}
for act in rcpsp_problem.tasks_list:
if act in list(scheduled_tasks_start_times.keys()):
minimum_starting_time[act] = scheduled_tasks_start_times[act]
else:
minimum_starting_time[act] = current_t
perm_extended = [
rcpsp_problem.tasks_list_non_dummy[x] for x in solution.rcpsp_permutation
]
perm_extended.insert(0, rcpsp_problem.source_task)
perm_extended.append(rcpsp_problem.sink_task)
modes_dict = rcpsp_problem.build_mode_dict(solution.rcpsp_modes)
# Update current resource usage by the scheduled task (ongoing task, in practice)
for act_id in scheduled_tasks_start_times:
current_min_time = scheduled_tasks_start_times[act_id]
end_t = (
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
for t in range(current_min_time, end_t):
for res in resource_avail_in_time:
resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[act_id][
modes_dict[act_id]
].get(res, 0)
if res in rcpsp_problem.non_renewable_resources and t == end_t - 1:
for tt in range(end_t, new_horizon):
resource_avail_in_time[res][tt] -= rcpsp_problem.mode_details[
act_id
][modes_dict[act_id]].get(res, 0)
if resource_avail_in_time[res][tt] < 0:
unfeasible_non_renewable_resources = True
activity_end_times[act_id] = end_t
perm_extended.remove(act_id)
for s in rcpsp_problem.successors[act_id]:
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
perm_extended = [x for x in perm_extended if x not in list(completed_tasks)]
# fix modes in case specified mode not in mode details for the activites
for ac in modes_dict:
if modes_dict[ac] not in rcpsp_problem.mode_details[ac]:
modes_dict[ac] = 1
while len(perm_extended) > 0 and not unfeasible_non_renewable_resources:
# get first activity in perm with precedences respected
for id_successor in perm_extended:
respected = True
for pred in rcpsp_problem.successors.keys():
if (
id_successor in rcpsp_problem.successors[pred]
and pred in perm_extended
):
respected = False
break
if respected:
act_id = id_successor
break
current_min_time = minimum_starting_time[act_id]
valid = False
while not valid:
valid = True
for t in range(
current_min_time,
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"],
):
for res in resource_avail_in_time:
if t < new_horizon:
if resource_avail_in_time[res][t] < rcpsp_problem.mode_details[
act_id
][modes_dict[act_id]].get(res, 0):
valid = False
else:
unfeasible_non_renewable_resources = True
if not valid:
current_min_time += 1
if not unfeasible_non_renewable_resources:
end_t = (
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
for t in range(current_min_time, end_t):
for res in resource_avail_in_time:
resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[
act_id
][modes_dict[act_id]].get(res, 0)
if res in rcpsp_problem.non_renewable_resources and t == end_t - 1:
for tt in range(end_t + 1, new_horizon):
resource_avail_in_time[res][tt] -= (
rcpsp_problem.mode_details[act_id][
modes_dict[act_id]
].get(res, 0)
)
if resource_avail_in_time[res][tt] < 0:
unfeasible_non_renewable_resources = True
activity_end_times[act_id] = end_t
perm_extended.remove(act_id)
for s in rcpsp_problem.successors[act_id]:
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
rcpsp_schedule: dict[Hashable, dict[str, int]] = {}
for act_id in activity_end_times:
rcpsp_schedule[act_id] = {}
rcpsp_schedule[act_id]["start_time"] = (
activity_end_times[act_id]
- rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
rcpsp_schedule[act_id]["end_time"] = activity_end_times[act_id]
for act_id in completed_tasks:
rcpsp_schedule[act_id] = {}
rcpsp_schedule[act_id]["start_time"] = completed_tasks[act_id].start
rcpsp_schedule[act_id]["end_time"] = completed_tasks[act_id].end
if unfeasible_non_renewable_resources:
rcpsp_schedule_feasible = False
last_act_id = rcpsp_problem.sink_task
if last_act_id not in rcpsp_schedule:
rcpsp_schedule[last_act_id] = {}
rcpsp_schedule[last_act_id]["start_time"] = 99999999
rcpsp_schedule[last_act_id]["end_time"] = 9999999
else:
rcpsp_schedule_feasible = True
return rcpsp_schedule, rcpsp_schedule_feasible
[docs]
def generate_schedule_from_permutation_serial_sgs_partial_schedule_specialized_constraints(
solution: RcpspSolution,
rcpsp_problem: RcpspProblem,
current_t: int,
completed_tasks: dict[Hashable, TaskDetails],
scheduled_tasks_start_times: dict[Hashable, int],
) -> tuple[dict[Hashable, dict[str, int]], bool]:
activity_end_times = {}
unfeasible_non_renewable_resources = False
new_horizon = rcpsp_problem.horizon
resource_avail_in_time = {}
for res in rcpsp_problem.resources_list:
if rcpsp_problem.is_varying_resource():
resource_avail_in_time[res] = rcpsp_problem.resources[res][ # type: ignore
: new_horizon + 1
]
else:
resource_avail_in_time[res] = np.full(
new_horizon, rcpsp_problem.resources[res], dtype=np.int_
).tolist()
def ressource_consumption(
res: str, task: Hashable, duration: int, mode: int
) -> int:
dur = rcpsp_problem.mode_details[task][mode]["duration"]
if duration > dur:
return 0
return rcpsp_problem.mode_details[task][mode].get(res, 0)
minimum_starting_time = {}
for act in rcpsp_problem.tasks_list:
if act in list(scheduled_tasks_start_times.keys()):
minimum_starting_time[act] = scheduled_tasks_start_times[act]
else:
minimum_starting_time[act] = current_t
if rcpsp_problem.do_special_constraints:
if act in rcpsp_problem.special_constraints.start_times_window:
minimum_starting_time[act] = (
max( # type: ignore
rcpsp_problem.special_constraints.start_times_window[act][0],
minimum_starting_time[act],
)
if rcpsp_problem.special_constraints.start_times_window[act][0]
is not None
else minimum_starting_time[act]
)
perm_extended = [
rcpsp_problem.tasks_list_non_dummy[x] for x in solution.rcpsp_permutation
]
perm_extended.insert(0, rcpsp_problem.source_task)
perm_extended.append(rcpsp_problem.sink_task)
modes_dict = rcpsp_problem.build_mode_dict(solution.rcpsp_modes)
# Update current resource usage by the scheduled task (ongoing task, in practice)
for act_id in scheduled_tasks_start_times:
current_min_time = scheduled_tasks_start_times[act_id]
end_t = (
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
for t in range(current_min_time, end_t):
for res in resource_avail_in_time:
resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[act_id][
modes_dict[act_id]
].get(res, 0)
if res in rcpsp_problem.non_renewable_resources and t == end_t - 1:
for tt in range(end_t, new_horizon):
resource_avail_in_time[res][tt] -= rcpsp_problem.mode_details[
act_id
][modes_dict[act_id]].get(res, 0)
if resource_avail_in_time[res][tt] < 0:
unfeasible_non_renewable_resources = True
activity_end_times[act_id] = end_t
perm_extended.remove(act_id)
for s in rcpsp_problem.successors[act_id]:
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
perm_extended = [x for x in perm_extended if x not in list(completed_tasks)]
for ac in modes_dict:
if modes_dict[ac] not in rcpsp_problem.mode_details[ac]:
modes_dict[ac] = 1
while len(perm_extended) > 0 and not unfeasible_non_renewable_resources:
act_ids = []
for task_id in perm_extended:
respected = True
# Check all kind of precedence constraints....
for pred in rcpsp_problem.predecessors.get(task_id, {}):
if pred in perm_extended:
respected = False
break
for pred in rcpsp_problem.special_constraints.dict_start_at_end_reverse.get(
task_id, {}
):
if pred in perm_extended:
respected = False
break
for (
pred
) in rcpsp_problem.special_constraints.dict_start_at_end_offset_reverse.get(
task_id, {}
):
if pred in perm_extended:
respected = False
break
for (
pred
) in rcpsp_problem.special_constraints.dict_start_after_nunit_reverse.get(
task_id, {}
):
if pred in perm_extended:
respected = False
break
task_to_start_too: list[Hashable] = []
if respected:
task_to_start_too = [
k
for k in rcpsp_problem.special_constraints.dict_start_together.get(
task_id, set()
)
if k in perm_extended
]
if len(task_to_start_too) > 0:
if not all(
s not in perm_extended
for t in task_to_start_too
for s in rcpsp_problem.predecessors[t]
):
respected = False
if respected:
act_ids = [task_id] + task_to_start_too
break
if len(act_ids) == 0:
for task_id in perm_extended:
respected = True
# Check all kind of precedence constraints....
for pred in rcpsp_problem.predecessors.get(task_id, {}):
if pred in perm_extended:
respected = False
break
if respected:
act_ids = [task_id]
current_min_time = max([minimum_starting_time[act_id] for act_id in act_ids])
max_duration = max(
[
rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
for act_id in act_ids
]
)
valid = False
while not valid:
valid = True
for t in range(current_min_time, current_min_time + max_duration):
for res in rcpsp_problem.resources_list:
r = sum(
[
ressource_consumption(
res=res,
task=task,
duration=t - current_min_time,
mode=modes_dict[task],
)
for task in act_ids
]
)
if r == 0:
continue
if t < new_horizon:
if resource_avail_in_time[res][t] < r:
valid = False
break
else:
unfeasible_non_renewable_resources = True
if not valid:
break
if not valid:
current_min_time += 1
if not unfeasible_non_renewable_resources:
end_t = current_min_time + max_duration
for t in range(current_min_time, current_min_time + max_duration):
for res in resource_avail_in_time:
r = sum(
[
ressource_consumption(
res=res,
task=task,
duration=t - current_min_time,
mode=modes_dict[task],
)
for task in act_ids
]
)
resource_avail_in_time[res][t] -= r
if res in rcpsp_problem.non_renewable_resources and t == end_t - 1:
for tt in range(end_t, new_horizon):
resource_avail_in_time[res][tt] -= r
if resource_avail_in_time[res][tt] < 0:
unfeasible_non_renewable_resources = True
for act_id in act_ids:
activity_end_times[act_id] = (
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
perm_extended.remove(act_id)
for s in rcpsp_problem.successors[act_id]:
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
for s in rcpsp_problem.special_constraints.dict_start_at_end.get(
act_id, {}
):
minimum_starting_time[s] = activity_end_times[act_id]
for s in rcpsp_problem.special_constraints.dict_start_after_nunit.get(
act_id, {}
):
minimum_starting_time[s] = (
current_min_time
+ rcpsp_problem.special_constraints.dict_start_after_nunit[
act_id
][s]
)
for s in rcpsp_problem.special_constraints.dict_start_at_end_offset.get(
act_id, {}
):
minimum_starting_time[s] = (
activity_end_times[act_id]
+ rcpsp_problem.special_constraints.dict_start_at_end_offset[
act_id
][s]
)
rcpsp_schedule: dict[Hashable, dict[str, int]] = {}
for act_id in activity_end_times:
rcpsp_schedule[act_id] = {}
rcpsp_schedule[act_id]["start_time"] = (
activity_end_times[act_id]
- rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
rcpsp_schedule[act_id]["end_time"] = activity_end_times[act_id]
for act_id in completed_tasks:
rcpsp_schedule[act_id] = {}
rcpsp_schedule[act_id]["start_time"] = completed_tasks[act_id].start
rcpsp_schedule[act_id]["end_time"] = completed_tasks[act_id].end
if unfeasible_non_renewable_resources:
rcpsp_schedule_feasible = False
last_act_id = rcpsp_problem.sink_task
if last_act_id not in rcpsp_schedule:
rcpsp_schedule[last_act_id] = {}
rcpsp_schedule[last_act_id]["start_time"] = 99999999
rcpsp_schedule[last_act_id]["end_time"] = 9999999
else:
rcpsp_schedule_feasible = True
return rcpsp_schedule, rcpsp_schedule_feasible
[docs]
def compute_mean_resource_reserve(
solution: RcpspSolution, rcpsp_problem: RcpspProblem
) -> float:
if not solution.rcpsp_schedule_feasible:
return 0.0
last_activity = rcpsp_problem.sink_task
makespan = solution.rcpsp_schedule[last_activity]["end_time"]
resource_avail_in_time = {}
modes = rcpsp_problem.build_mode_dict(solution.rcpsp_modes)
for res in rcpsp_problem.resources_list:
if rcpsp_problem.is_varying_resource():
resource_avail_in_time[res] = rcpsp_problem.resources[res][: makespan + 1] # type: ignore
else:
resource_avail_in_time[res] = np.full(
makespan, rcpsp_problem.resources[res], dtype=np.int_
).tolist()
for act_id in rcpsp_problem.tasks_list:
start_time = solution.rcpsp_schedule[act_id]["start_time"]
end_time = solution.rcpsp_schedule[act_id]["end_time"]
mode = modes[act_id]
for t in range(start_time, end_time):
for res in resource_avail_in_time:
if rcpsp_problem.mode_details[act_id][mode].get(res, 0) == 0:
continue
resource_avail_in_time[res][t] -= rcpsp_problem.mode_details[act_id][
mode
][res]
if res in rcpsp_problem.non_renewable_resources and t == end_time:
for tt in range(end_time, makespan):
resource_avail_in_time[res][tt] -= rcpsp_problem.mode_details[
act_id
][mode][res]
mean_avail = {}
for res in resource_avail_in_time:
mean_avail[res] = np.mean(resource_avail_in_time[res])
mean_resource_reserve = np.mean(
[
mean_avail[res] / rcpsp_problem.get_max_resource_capacity(res)
for res in rcpsp_problem.resources_list
]
)
return float(mean_resource_reserve)
[docs]
class PartialSolution:
def __init__(
self,
task_mode: Optional[dict[int, int]] = None,
start_times: Optional[dict[int, int]] = None,
end_times: Optional[dict[int, int]] = None,
partial_permutation: Optional[list[int]] = None,
list_partial_order: Optional[list[list[int]]] = None,
start_together: Optional[list[tuple[int, int]]] = None,
start_at_end: Optional[list[tuple[int, int]]] = None,
start_at_end_plus_offset: Optional[list[tuple[int, int, int]]] = None,
start_after_nunit: Optional[list[tuple[int, int, int]]] = None,
disjunctive_tasks: Optional[list[tuple[int, int]]] = None,
start_times_window: Optional[dict[Hashable, tuple[int, int]]] = None,
end_times_window: Optional[dict[Hashable, tuple[int, int]]] = None,
):
self.task_mode = task_mode
self.start_times = start_times
self.end_times = end_times
self.partial_permutation = partial_permutation
self.list_partial_order = list_partial_order
self.start_together = start_together
self.start_at_end = start_at_end
self.start_after_nunit = start_after_nunit
self.start_at_end_plus_offset = start_at_end_plus_offset
self.disjunctive_tasks = disjunctive_tasks
self.start_times_window = start_times_window
self.end_times_window = end_times_window
# one element in self.list_partial_order is a list [l1, l2, l3]
# indicating that l1 should be started before l1, and l2 before l3 for example