# Copyright (c) 2023 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from collections.abc import Hashable
from copy import deepcopy
from typing import Optional
import numpy as np
from numpy import typing as npt
from sortedcontainers import SortedDict
from discrete_optimization.rcpsp import RcpspProblem
from discrete_optimization.rcpsp.solution import RcpspSolution
[docs]
class SgsWithoutArray:
def __init__(self, rcpsp_problem: RcpspProblem):
self.rcpsp_problem = rcpsp_problem
self.resource_avail_in_time: dict[str, npt.NDArray[np.int_]] = {}
for res in self.rcpsp_problem.resources_list:
if self.rcpsp_problem.is_varying_resource():
self.resource_avail_in_time[res] = np.array(
self.rcpsp_problem.resources[res][: self.rcpsp_problem.horizon + 1] # type: ignore
)
else:
self.resource_avail_in_time[res] = np.full(
self.rcpsp_problem.horizon,
self.rcpsp_problem.resources[res],
dtype=np.int_,
)
self.dict_step_ressource: dict[str, SortedDict] = {}
for res in self.resource_avail_in_time:
self.dict_step_ressource[res] = SgsWithoutArray.create_absolute_dict(
self.resource_avail_in_time[res]
)
[docs]
@staticmethod
def get_available_from_delta(sdict: SortedDict, time: int) -> int:
index = sdict.bisect_left(time)
if time in sdict:
r = range(index + 1)
else:
r = range(index)
s = sum(sdict.peekitem(j)[1] for j in r)
return s
[docs]
@staticmethod
def get_available_from_absolute(sdict: SortedDict, time: int) -> int:
index = sdict.bisect_right(time)
s = sdict.peekitem(index - 1)[1]
return s
[docs]
@staticmethod
def add_event_delta(
sdict: SortedDict, time_start: int, delta: int, time_end: int
) -> None:
if time_start in sdict:
sdict[time_start] += delta
else:
sdict.update({time_start: delta})
if time_end in sdict:
sdict[time_end] -= delta
else:
sdict.update({time_end: -delta})
[docs]
@staticmethod
def add_event_delta_in_absolute(
sdict: SortedDict,
time_start: int,
delta: int, # Negative usually
time_end: int,
liberate: bool = True,
) -> None:
for t in [k for k in sdict if time_start <= k < time_end]:
sdict[t] += delta
i = sdict.bisect_right(time_start)
if time_start not in sdict:
sdict[time_start] = sdict.peekitem(i - 1)[1] + delta
i = sdict.bisect_right(time_end)
if time_end not in sdict:
sdict[time_end] = sdict.peekitem(i - 1)[1] - delta
[docs]
@staticmethod
def create_delta_dict(vector: npt.ArrayLike) -> SortedDict:
v = np.array(vector)
delta = v[:-1] - v[1:]
index_non_zero = np.nonzero(delta)[0]
l = {0: v[0]}
for j in range(len(index_non_zero)):
ind = index_non_zero[j]
l[ind + 1] = -delta[ind]
l = SortedDict(l)
return l
[docs]
@staticmethod
def create_absolute_dict(vector: npt.ArrayLike) -> SortedDict:
v = np.array(vector)
delta = v[:-1] - v[1:]
index_non_zero = np.nonzero(delta)[0]
l = {0: v[0]}
for j in range(len(index_non_zero)):
ind = index_non_zero[j]
l[ind + 1] = v[ind + 1]
l = SortedDict(l)
return l
[docs]
def generate_schedule_from_permutation_serial_sgs(
self, solution: RcpspSolution, rcpsp_problem: RcpspProblem
) -> tuple[dict[Hashable, dict[str, int]], bool, dict[str, SortedDict]]:
activity_end_times = {}
unfeasible_non_renewable_resources = False
new_horizon = rcpsp_problem.horizon
resource_avail_in_time = deepcopy(self.dict_step_ressource)
minimum_starting_time = {}
for act in rcpsp_problem.tasks_list:
minimum_starting_time[act] = 0
perm_extended = [
rcpsp_problem.tasks_list_non_dummy[x] for x in solution.rcpsp_permutation
]
perm_extended.insert(0, rcpsp_problem.source_task)
perm_extended.append(rcpsp_problem.sink_task)
modes_dict = rcpsp_problem.build_mode_dict(solution.rcpsp_modes)
for k in modes_dict:
if modes_dict[k] not in rcpsp_problem.mode_details[k]:
modes_dict[k] = 1
while len(perm_extended) > 0 and not unfeasible_non_renewable_resources:
for id_successor in perm_extended:
respected = True
for pred in rcpsp_problem.successors:
if (
id_successor in rcpsp_problem.successors[pred]
and pred in perm_extended
):
respected = False
break
if respected:
act_id = id_successor
break
current_min_time: Optional[int] = minimum_starting_time[act_id]
start_time: int
end_time: int
valid = False
while not valid:
valid = True
if current_min_time is None:
unfeasible_non_renewable_resources = True
break
start_time = current_min_time
end_time = (
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
for res in rcpsp_problem.resources_list:
need = rcpsp_problem.mode_details[act_id][modes_dict[act_id]].get(
res, 0
)
if need == 0:
continue
else:
if (
self.get_available_from_absolute(
sdict=resource_avail_in_time[res], time=current_min_time # type: ignore
)
< need
):
current_min_time = next(
(
k
for k in resource_avail_in_time[res]
if k > start_time
and resource_avail_in_time[res][k] >= need
),
None,
)
valid = False
break
keys = [
k
for k in resource_avail_in_time[res]
if start_time <= k < end_time
]
for k in keys:
if resource_avail_in_time[res][k] < need:
current_min_time = next(
(
ki
for ki in resource_avail_in_time[res]
if ki > k
and resource_avail_in_time[res][ki] >= need
),
None,
)
valid = False
break
if not unfeasible_non_renewable_resources and current_min_time is not None:
end_t = (
current_min_time
+ rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
for res in resource_avail_in_time:
need = rcpsp_problem.mode_details[act_id][modes_dict[act_id]].get(
res, 0
)
if need == 0:
continue
if res not in rcpsp_problem.non_renewable_resources:
self.add_event_delta_in_absolute(
resource_avail_in_time[res],
time_start=current_min_time,
delta=-need,
time_end=end_t,
)
else:
self.add_event_delta_in_absolute(
resource_avail_in_time[res],
time_start=new_horizon,
delta=-need,
time_end=new_horizon + 2,
)
if resource_avail_in_time[res][new_horizon] < 0:
unfeasible_non_renewable_resources = True
if unfeasible_non_renewable_resources:
break
activity_end_times[act_id] = end_t
perm_extended.remove(act_id)
for s in rcpsp_problem.successors[act_id]:
minimum_starting_time[s] = max(
minimum_starting_time[s], activity_end_times[act_id]
)
rcpsp_schedule: dict[Hashable, dict[str, int]] = {}
for act_id in activity_end_times:
rcpsp_schedule[act_id] = {}
rcpsp_schedule[act_id]["start_time"] = (
activity_end_times[act_id]
- rcpsp_problem.mode_details[act_id][modes_dict[act_id]]["duration"]
)
rcpsp_schedule[act_id]["end_time"] = activity_end_times[act_id]
if unfeasible_non_renewable_resources:
rcpsp_schedule_feasible = False
last_act_id = rcpsp_problem.sink_task
if last_act_id not in rcpsp_schedule:
rcpsp_schedule[last_act_id] = {}
rcpsp_schedule[last_act_id]["start_time"] = 99999999
rcpsp_schedule[last_act_id]["end_time"] = 9999999
else:
rcpsp_schedule_feasible = True
return rcpsp_schedule, rcpsp_schedule_feasible, resource_avail_in_time