# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import dataclasses
from typing import Hashable
import numpy as np
from discrete_optimization.flex_scheduling.problem import (
RESOURCE_KEY,
FlexProblem,
GroupType,
ScheduleSolution,
ScheduleSolutionPreemptive,
TaskGroupAbstraction,
TasksGroups,
)
[docs]
def init_capacity_resource(flex_problem: FlexProblem):
nb_resources = flex_problem.nb_resources
max_len_calendar = max(
len(res.calendar_availability) for res in flex_problem.resources
)
capacity = np.zeros((nb_resources, max_len_calendar), dtype=int)
for r_idx, res in enumerate(flex_problem.resources):
arr = np.array(res.calendar_availability, dtype=int)
capacity[r_idx, : len(arr)] = arr
return capacity
[docs]
def resource_consumption(flex_problem: FlexProblem, solution: ScheduleSolution):
# Build an array resource_consumptions[i, r]
resource_consumptions = np.zeros(
(flex_problem.nb_tasks, flex_problem.nb_resources), dtype=int
)
for i in range(flex_problem.nb_tasks):
mode_i = solution.modes[i]
consumption_dict = flex_problem.tasks[i].modes[mode_i].resource_consumption
for r_idx, r_name in enumerate(flex_problem.resources):
resource_consumptions[i, r_idx] = consumption_dict.get(r_name.id, 0)
return resource_consumptions
[docs]
def resource_consumption_modes(flex_problem: FlexProblem):
# Build an array resource_consumptions[i, r]
max_nb_modes = max(
len(flex_problem.tasks[i].modes) for i in range(flex_problem.nb_tasks)
)
resource_consumptions = np.zeros(
(flex_problem.nb_tasks, max_nb_modes, flex_problem.nb_resources), dtype=int
)
for i in range(flex_problem.nb_tasks):
modes = sorted(list(flex_problem.tasks[i].modes.keys()))
for j, mode_j in enumerate(modes):
consumption_dict = flex_problem.tasks[i].modes[mode_j].resource_consumption
for r_idx, r_name in enumerate(flex_problem.resources):
resource_consumptions[i, mode_j, r_idx] = consumption_dict.get(
r_name.id, 0
)
return resource_consumptions
[docs]
def build_task_to_set_of_groups(flex_problem: FlexProblem):
task_to_group_set = {}
for group in flex_problem.tasks_group:
group_id = group.id
for task in group.tasks_group:
if task not in task_to_group_set:
task_to_group_set[task] = set()
task_to_group_set[task].add(group_id)
return task_to_group_set
[docs]
def post_process_schedule(
flex_problem: FlexProblem,
solution: ScheduleSolution,
keep_min_time: bool = False,
keep_strict_order_task: bool = False,
):
# resource_to_index = {r: i for i, r in enumerate(flex_problem.resources)}
capacity = init_capacity_resource(flex_problem)
resource_consumptions = resource_consumption(
flex_problem=flex_problem, solution=solution
)
task_to_group_set = build_task_to_set_of_groups(flex_problem)
tasks_order = np.argsort(solution.schedule[:, 0])
new_schedule = {}
current_group = {}
successors_task = flex_problem.constraints.successors
predecessors_task = {}
if successors_task is not None:
for i in successors_task:
for t in successors_task[i]:
if t not in predecessors_task:
predecessors_task[t] = set()
predecessors_task[t].add(i)
successors_group = flex_problem.constraints.successors_group_tasks
predecessors_group = {}
if successors_group is not None:
for i in successors_group:
for t in successors_group[i]:
if t not in predecessors_group:
predecessors_group[t] = set()
predecessors_group[t].add(i)
successor_with_res_release_at_start_of_successor = (
flex_problem.constraints.successor_with_res_release_at_start_of_successor
)
successor_generic_with_res_release_at_start_of_successor_generic = flex_problem.constraints.successor_generic_with_res_release_at_start_of_successor_generic
scheduled_task = set()
scheduled_group = set()
cur_end_time = 0
index_task = 0
for i_task in tasks_order:
id_task = flex_problem.index_to_task_id[i_task]
print("Scheduling...", id_task)
min_time = flex_problem.tasks[i_task].min_starting_date
if min_time is None:
min_time = 0
cons = resource_consumptions[i_task, :]
if id_task in predecessors_task:
min_time = max(
min_time,
max([new_schedule[j][-1][1] for j in predecessors_task[id_task]]),
) # Max time of predecessors
if id_task in task_to_group_set:
for g in task_to_group_set[id_task]:
if g in predecessors_group:
for pred_g in predecessors_group[g]:
min_time = max(
min_time,
max(
[
new_schedule[jj][-1][1]
for jj in flex_problem.tasks_group[
flex_problem.group_id_to_index[pred_g]
].tasks_group
]
),
)
duration = flex_problem.tasks[i_task].modes[solution.modes[i_task]].duration
duration_done = 0
part = 0
cur_time = min_time
if keep_min_time:
cur_time = max(min_time, int(solution.schedule[i_task, 0]))
if keep_strict_order_task:
if index_task > 0:
st_prev = int(solution.schedule[tasks_order[index_task - 1], 0])
id_task_prev = flex_problem.index_to_task_id[
tasks_order[index_task - 1]
]
actual_st = int(new_schedule[id_task_prev][0][0])
cur_time = max(
min_time, actual_st + (int(solution.schedule[i_task, 0]) - st_prev)
)
new_schedule[id_task] = []
if duration_done == duration:
new_schedule[id_task].append((min_time, min_time))
while duration_done < duration:
next_start = next(
i
for i in range(cur_time, capacity.shape[1])
if np.min(capacity[:, i] - cons) >= 0
)
next_end = next(
(
j
for j in range(next_start, next_start + duration - duration_done)
if np.min(capacity[:, j] - cons) < 0
),
None,
)
if next_end is None:
next_end = next_start + duration - duration_done
new_schedule[id_task].append((next_start, next_end))
for k in range(next_start, next_end):
capacity[:, k] -= cons
duration_done += next_end - next_start
cur_time = next_end + 1
cur_end_time = new_schedule[id_task][0][1]
scheduled_task.add(id_task)
if id_task in task_to_group_set:
for g in task_to_group_set[id_task]:
group = flex_problem.tasks_group[flex_problem.group_id_to_index[g]]
if all(t_id in scheduled_task for t_id in group.tasks_group):
scheduled_group.add(g)
index_task += 1
return new_schedule
[docs]
def build_graph_dependencies(flex_problem: FlexProblem):
import networkx as nx
fsp = flex_problem
graph = nx.DiGraph()
graph.add_nodes_from(
[flex_problem.tasks[i].id for i in range(flex_problem.nb_tasks)]
)
successors_task = fsp.constraints.successors
if successors_task is not None:
for i in successors_task:
for t in successors_task[i]:
graph.add_edge(i, t, label="classic")
successors_group_tasks = fsp.constraints.successors_group_tasks
if successors_group_tasks is not None:
for g in successors_group_tasks:
gr = flex_problem.tasks_group[flex_problem.group_id_to_index[g]]
for oth_g in successors_group_tasks[g]:
ogr = flex_problem.tasks_group[flex_problem.group_id_to_index[oth_g]]
for i_gr in gr.tasks_group:
for i_ogr in ogr.tasks_group:
graph.add_edge(i_gr, i_ogr, label="via_group")
for (
t1,
t2,
res,
) in flex_problem.constraints.successor_generic_with_res_release_at_start_of_successor_generic:
if t1.is_a_task:
id_1 = [t1.task_id]
else:
group = flex_problem.tasks_group[
flex_problem.group_id_to_index[t1.group_id]
]
id_1 = list(group.tasks_group)
if t2.is_a_task:
id_2 = [t2.task_id]
else:
group = flex_problem.tasks_group[
flex_problem.group_id_to_index[t2.group_id]
]
id_2 = list(group.tasks_group)
for id1 in id_1:
for id2 in id_2:
graph.add_edge(id1, id2, label="constr")
return graph
[docs]
def get_tasks_in_events(
problem: FlexProblem,
event: tuple[TaskGroupAbstraction, TaskGroupAbstraction, dict[RESOURCE_KEY, int]],
):
source = set()
sink = set()
if event[0].is_a_task:
source.add(event[0].task_id)
if event[1].is_a_task:
sink.add(event[1].task_id)
if not event[0].is_a_task:
source.update(
problem.tasks_group[
problem.group_id_to_index[event[0].group_id]
].tasks_group
)
if not event[1].is_a_task:
sink.update(
problem.tasks_group[
problem.group_id_to_index[event[1].group_id]
].tasks_group
)
return source, sink
[docs]
@dataclasses.dataclass
class StateOfEvent:
event: tuple[TaskGroupAbstraction, TaskGroupAbstraction, dict[RESOURCE_KEY, int]]
source: set[Hashable]
sink: set[Hashable]
active: bool = False
start_event_triggered: int = None
end_event_triggered: int = None
[docs]
def update_state_of_event(
self, full_schedule: dict[Hashable, list[tuple[int, int]]]
):
if all(t in full_schedule for t in self.source):
self.start_event_triggered = max(
[full_schedule[t][-1][1] for t in self.source]
)
self.active = True
if any(t in full_schedule for t in self.sink):
self.end_event_triggered = min(
[full_schedule[t][0][0] for t in self.sink if t in full_schedule]
)
[docs]
@dataclasses.dataclass
class StateOfGroup:
event: TasksGroups
active: bool = False
start_event_triggered: int = None
end_event_triggered: int = None
[docs]
def update_state_of_event(
self, full_schedule: dict[Hashable, list[tuple[int, int]]]
):
if all(t in full_schedule for t in self.event.tasks_group):
self.start_event_triggered = min(
[full_schedule[t][0][0] for t in self.event.tasks_group]
)
self.end_event_triggered = max(
[full_schedule[t][-1][1] for t in self.event.tasks_group]
)
self.active = True
if any(t in full_schedule for t in self.event.tasks_group):
self.start_event_triggered = min(
[
full_schedule[t][0][0]
for t in self.event.tasks_group
if t in full_schedule
]
)
self.active = True
[docs]
class PostprocessTool:
def __init__(self, flex_problem: FlexProblem, solution: ScheduleSolution):
self.problem = flex_problem
self.solution = solution
self.capacity = init_capacity_resource(flex_problem) # array (i_resource, time)
self.resource_consumption = resource_consumption(
flex_problem=flex_problem, solution=solution
) # array (i_task, i_resource)
self.graph = build_graph_dependencies(flex_problem=flex_problem)
self.tasks_id_to_group = build_task_to_set_of_groups(self.problem)
self.predecessors = {
t: list(self.graph.predecessors(t)) for t in self.graph.nodes
}
self.nr_events = self.problem.constraints.successor_generic_with_res_release_at_start_of_successor_generic
self.states_events = []
for i in range(len(self.nr_events)):
source, sink = get_tasks_in_events(
problem=self.problem, event=self.nr_events[i]
)
self.states_events.append(
StateOfEvent(event=self.nr_events[i], source=source, sink=sink)
)
self.state_groups = []
for g in self.problem.tasks_group:
if g.type_of_group == GroupType.GROUP_TASK_NON_RELEASED_RESOURCE:
self.state_groups.append(StateOfGroup(g))
[docs]
def post_process_left(
self,
flex_problem: FlexProblem,
solution: ScheduleSolution,
keep_min_time: bool = False,
keep_strict_order_task: bool = False,
):
# resource_to_index = {r: i for i, r in enumerate(flex_problem.resources)}
capacity = self.capacity
resource_consumptions = self.resource_consumption
tasks_order = list(np.argsort(solution.schedule[:, 0]))
tasks_order = sorted(
range(solution.schedule.shape[0]),
key=lambda x: (
solution.schedule[x, 0],
-1
if any(
self.problem.index_to_task_id[x] in event.sink
for event in self.states_events
)
else 0,
),
)
new_schedule = {}
scheduled_task = set()
prev_id_task = None
currently_failed = set()
while len(new_schedule) < self.problem.nb_tasks:
i_task = next(
(
j
for j in tasks_order
if self.problem.index_to_task_id[j] not in new_schedule
and j not in currently_failed
and all(
t in new_schedule
for t in self.predecessors[self.problem.index_to_task_id[j]]
)
# and
)
)
id_task = flex_problem.index_to_task_id[i_task]
print("Scheduling...", id_task)
min_time = flex_problem.tasks[i_task].min_starting_date
if min_time is None:
min_time = 0
cons = resource_consumptions[i_task, :]
if id_task in self.predecessors and len(self.predecessors[id_task]) > 0:
min_time = max(
min_time,
max([new_schedule[j][-1][1] for j in self.predecessors[id_task]]),
) # Max time of predecessors
if id_task in self.tasks_id_to_group:
for g in self.tasks_id_to_group[id_task]:
group = self.problem.tasks_group[self.problem.group_id_to_index[g]]
if group.no_overlap:
if any(jj in new_schedule for jj in group.tasks_group):
# avoid overlap..
print("prev", min_time)
min_time = max(
min_time,
max(
[
new_schedule[jj][-1][1]
for jj in group.tasks_group
if jj in new_schedule
]
),
)
print("new", min_time)
duration = flex_problem.tasks[i_task].modes[solution.modes[i_task]].duration
duration_done = 0
cur_time = min_time
if keep_min_time:
cur_time = max(min_time, int(solution.schedule[i_task, 0]))
if keep_strict_order_task:
if prev_id_task is not None:
cur_time = max(min_time, new_schedule[prev_id_task][0][0])
new_schedule[id_task] = []
if duration_done == duration:
new_schedule[id_task].append((min_time, min_time))
def get_capacity_index_task(index_task: int, time: int):
conso = self.resource_consumption[index_task, :]
nz = np.nonzero(conso)[0]
capa = np.copy(self.capacity[:, time])
id_task_ = self.problem.index_to_task_id[index_task]
for event in self.states_events:
if id_task_ not in event.sink:
if event.active:
if (
event.end_event_triggered is None
or event.end_event_triggered > time
):
# print("End event trigerred Release", event.end_event_triggered)
for r in event.event[2]:
id_r = self.problem.resource_id_to_index[r]
if id_r in nz:
capa[id_r] -= event.event[2][r]
if capa[id_r] < cons[id_r]:
pass
# print("Current task", id_task_)
# print("sink", event.sink)
# print("Current task schedule",
# solution.schedule[index_task])
# for s in event.sink:
# print(solution.schedule[self.problem.task_id_to_index[s]])
for event in self.state_groups:
if id_task_ not in event.event.tasks_group:
if event.active:
if (
event.end_event_triggered is None
or event.end_event_triggered > time
):
# print("End event trigerred Group", event.end_event_triggered)
if event.event.res_not_released is not None:
for r in event.event.res_not_released:
id_r = self.problem.resource_id_to_index[r]
if id_r in nz:
capa[id_r] -= event.event.res_not_released[
r
]
if id_task_ in event.event.tasks_group:
if event.event.no_overlap:
pass # Do something here.
return capa
failed = False
while duration_done < duration:
next_start = next(
(
i
for i in range(cur_time, capacity.shape[1])
if np.min(
get_capacity_index_task(index_task=i_task, time=i) - cons
)
>= 0
),
None,
)
if next_start is None:
tasks_order = tasks_order
currently_failed.add(i_task)
if id_task in new_schedule:
for ns, ne in new_schedule[id_task]:
for k in range(ns, ne):
capacity[:, k] += cons
new_schedule.pop(id_task)
failed = True
break
next_end = next(
(
j
for j in range(
next_start, next_start + duration - duration_done
)
if np.min(
get_capacity_index_task(index_task=i_task, time=j) - cons
)
< 0
),
None,
)
if next_end is None:
next_end = next_start + duration - duration_done
new_schedule[id_task].append((next_start, next_end))
for k in range(next_start, next_end):
capacity[:, k] -= cons
duration_done += next_end - next_start
cur_time = next_end + 1
if not failed:
currently_failed = set()
scheduled_task.add(id_task)
for e in self.state_groups:
e.update_state_of_event(new_schedule)
for e in self.states_events:
e.update_state_of_event(new_schedule)
prev_id_task = id_task
return new_schedule
[docs]
def compute_equivalent_preemptive_solution(
flex_problem: FlexProblem, solution: ScheduleSolution
):
"""Dont change the actual schedule but make the solution preemptive"""
schedule = []
capacity = init_capacity_resource(flex_problem)
resource_consumptions = resource_consumption(
flex_problem=flex_problem, solution=solution
)
for i in range(flex_problem.nb_tasks):
st, end = solution.schedule[i, :]
st = int(st)
end = int(end)
if st == end:
schedule.append([(st, end)])
continue
active_time = []
active_slots = []
start_slot = None
for time in range(st, end):
if np.min(capacity[:, time] - resource_consumptions[i, :]) >= 0:
# All resource here.
active_time.append(time)
if start_slot is None:
start_slot = time
else:
if start_slot is not None:
active_slots.append((start_slot, active_time[-1] + 1))
start_slot = None
if start_slot is not None:
active_slots.append((start_slot, end))
sum_ = sum([x[1] - x[0] for x in active_slots])
print(sum_, flex_problem.tasks[i].modes[1].duration)
schedule.append(active_slots)
return ScheduleSolutionPreemptive(
problem=flex_problem, schedule=schedule, modes=solution.modes
)
[docs]
def check_solution(solution: ScheduleSolutionPreemptive, problem: FlexProblem):
"""To rework, do some basic check (was used to debug the postpro methods.."""
resource_consumption = np.zeros((problem.nb_resources, problem.horizon))
for i in range(len(solution.schedule)):
mode = solution.modes[i]
data = problem.tasks[i].modes[mode]
duration = data.duration
sum_ = sum([x[1] - x[0] for x in solution.schedule[i]])
if sum_ != duration:
print("Problem duration...", sum_, duration)
res_conso = data.resource_consumption
for ns, ne in solution.schedule[i]:
if ne > ns:
for r in res_conso:
id_r = problem.resource_id_to_index[r]
resource_consumption[id_r, ns:ne] += res_conso[r]
if (
np.min(
problem.resources[id_r].calendar_availability[ns:ne]
- resource_consumption[id_r, ns:ne]
)
< 0
):
print(
resource_consumption[id_r, ns:ne],
problem.resources[id_r].calendar_availability[ns:ne],
)
print("PROBLEM !")
for group in problem.tasks_group:
if group.type_of_group == GroupType.GROUP_TASK_NON_RELEASED_RESOURCE:
tasks = [problem.task_id_to_index[t] for t in group.tasks_group]
min_st = int(min([solution.schedule[i][0][0] for i in tasks]))
max_st = int(max([solution.schedule[i][-1][1] for i in tasks]))
for r in group.res_not_released:
id_r = problem.resource_id_to_index[r]
ind = [
i
for i in range(min_st, max_st)
if problem.resources[id_r].calendar_availability[i] > 0
]
for j in ind:
resource_consumption[id_r, j] += group.res_not_released[r]
if (
resource_consumption[id_r, j]
- problem.resources[id_r].calendar_availability[j]
> 0
):
print("PROBLEM ")
print(
f"PROBLEM with group consuming at time {j},",
resource_consumption[id_r, j],
problem.resources[id_r].calendar_availability[j],
)