# Copyright (c) 2025 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
import itertools
import logging
from collections.abc import Hashable
from functools import reduce
from typing import Optional
import networkx as nx
import numpy as np
from discrete_optimization.generic_tasks_tools.allocation import (
AllocationProblem,
AllocationSolution,
)
from discrete_optimization.generic_tasks_tools.precedence import PrecedenceProblem
from discrete_optimization.generic_tasks_tools.scheduling import (
SchedulingProblem,
SchedulingSolution,
)
from discrete_optimization.generic_tools.do_problem import (
EncodingRegister,
ModeOptim,
ObjectiveDoc,
ObjectiveHandling,
ObjectiveRegister,
Problem,
Solution,
TypeObjective,
)
from discrete_optimization.rcpsp.problem import (
PairModeConstraint,
RcpspProblem,
RcpspSolution,
SpecialConstraintsDescription,
)
logger = logging.getLogger(__name__)
Task = Hashable
UnaryResource = Hashable
[docs]
class AllocSchedulingSolution(
SchedulingSolution[Task], AllocationSolution[Task, UnaryResource]
):
problem: AllocSchedulingProblem
def __init__(
self,
problem: AllocSchedulingProblem,
schedule: np.ndarray,
allocation: np.ndarray,
):
self.problem = problem
self.schedule = schedule
self.allocation = allocation
[docs]
def copy(self) -> "Solution":
return AllocSchedulingSolution(
problem=self.problem,
schedule=np.copy(self.schedule),
allocation=np.copy(self.allocation),
)
[docs]
def change_problem(self, new_problem: "Problem") -> None:
self.problem = new_problem
[docs]
def get_end_time(self, task: Task) -> int:
i_task = self.problem.tasks_to_index[task]
return int(self.schedule[i_task, 1])
[docs]
def get_start_time(self, task: Task) -> int:
i_task = self.problem.tasks_to_index[task]
return int(self.schedule[i_task, 0])
[docs]
def is_allocated(self, task: Task, unary_resource: UnaryResource) -> bool:
i_task = self.problem.tasks_to_index[task]
i_team = self.problem.teams_to_index[unary_resource]
return int(self.allocation[i_task]) == i_team
[docs]
class TasksDescription:
def __init__(self, duration_task: int, resource_consumption: dict[str, int] = None):
self.duration_task = duration_task
self.resource_consumption = resource_consumption
if self.resource_consumption is None:
self.resource_consumption = {}
[docs]
class AllocSchedulingProblem(
SchedulingProblem[Task],
AllocationProblem[Task, UnaryResource],
PrecedenceProblem[Task],
):
def __init__(
self,
team_names: list[UnaryResource],
calendar_team: dict[
UnaryResource, list[tuple[int, int]]
], # List of available slots per team.
horizon: int,
tasks_list: list[Task],
tasks_data: dict[Task, TasksDescription],
same_allocation: list[set[Task]],
precedence_constraints: dict[Task, set[Task]],
available_team_for_activity: dict[Task, set[UnaryResource]],
start_window: dict[Task, tuple[Optional[int], Optional[int]]],
end_window: dict[Task, tuple[Optional[int], Optional[int]]],
original_start: dict[Task, int],
original_end: dict[Task, int],
resources_list: list[str] = None,
resources_capacity: dict[str, int] = None,
horizon_start_shift: Optional[int] = 0,
objective_handling: ObjectiveHandling = ObjectiveHandling.AGGREGATE,
):
self.team_names = team_names
self.calendar_team = realign_calendars(calendar_team)
self.horizon = horizon
self.tasks_list = tasks_list
self.tasks_data = tasks_data
self.same_allocation = same_allocation
self.precedence_constraints = precedence_constraints
self.available_team_for_activity = available_team_for_activity
self.start_window = start_window
self.end_window = end_window
self.original_start = original_start
self.original_end = original_end
self.number_teams = len(self.team_names)
self.number_tasks = len(self.tasks_list)
self.teams_to_index = {self.team_names[i]: i for i in range(self.number_teams)}
self.index_to_team = {i: self.team_names[i] for i in range(self.number_teams)}
self.tasks_to_index = {self.tasks_list[i]: i for i in range(self.number_tasks)}
self.index_to_task = {
self.tasks_to_index[key]: key for key in self.tasks_to_index
}
self.rcpsp, self.ac_mode_to_team = transform_to_multimode_rcpsp(
self,
build_calendar=True,
add_window_time_constraint=True,
add_additional_constraint=True,
)
self.objective_handling = objective_handling
self.resources_list = resources_list
self.resources_capacity = resources_capacity
self.horizon_start_shift = horizon_start_shift
self.compatible_teams_per_activity = self.compatible_teams_all_activity()
@property
def unary_resources_list(self) -> list[UnaryResource]:
return self.team_names
[docs]
def get_precedence_constraints(self) -> dict[Task, set[Task]]:
return self.precedence_constraints
[docs]
def is_compatible_task_unary_resource(
self, task: Task, unary_resource: UnaryResource
) -> bool:
return unary_resource in self.compatible_teams_per_activity[task]
[docs]
def get_makespan_lower_bound(self) -> int:
return max(int(self.get_lb_end_window(t)) for t in self.tasks_list)
[docs]
def get_makespan_upper_bound(self) -> int:
return max(int(self.get_ub_end_window(t)) for t in self.tasks_list)
[docs]
def set_objective_handling(self, objective_handling: ObjectiveHandling):
self.objective_handling = objective_handling
[docs]
def update_available_team_for_activity(
self, available_team_for_activity: dict[Hashable, set[Hashable]]
):
"""Override the available team attribute and update the rcpsp accordingly"""
self.available_team_for_activity = available_team_for_activity
self.rcpsp, self.ac_mode_to_team = transform_to_multimode_rcpsp(
self,
build_calendar=True,
add_window_time_constraint=True,
add_additional_constraint=True,
)
[docs]
def compute_predecessors(self):
self.predecessors = {}
for t in self.precedence_constraints:
for succ in self.precedence_constraints[t]:
if succ not in self.predecessors:
self.predecessors[succ] = set()
self.predecessors[succ].add(t)
[docs]
def evaluate(self, variable: Solution) -> dict[str, float]:
return evaluate_solution(solution=variable, problem=self)
[docs]
def satisfy(self, variable: Solution) -> bool:
if isinstance(variable, RcpspSolution):
return self.rcpsp.satisfy(variable)
if isinstance(variable, AllocSchedulingSolution):
return full_satisfy(problem=self, solution=variable, partial_solution=False)
# Deprecated version was transforming the solution into an RCPSPSolution.
# return self.rcpsp.satisfy(transform_alloc_solution_to_rcpsp_solution(alloc_solution=variable,
# rcpsp_problem=self.rcpsp,
# ac_mode_to_team=self.ac_mode_to_team,
# alloc_scheduling_problem=self))
[docs]
def get_attribute_register(self) -> EncodingRegister:
pass
[docs]
def get_solution_type(self) -> type[Solution]:
return AllocSchedulingSolution
[docs]
def get_objective_register(self) -> ObjectiveRegister:
dict_objective = {
"nb_not_done": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-100000
),
"nb_teams": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-10000.0
),
"makespan": ObjectiveDoc(type=TypeObjective.PENALTY, default_weight=-1.0),
"workload_dispersion": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-1.0
),
"nb_violations": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-100.0
),
}
return ObjectiveRegister(
objective_sense=ModeOptim.MAXIMIZATION,
objective_handling=self.objective_handling,
dict_objective_to_doc=dict_objective,
)
[docs]
def get_lb_start_window(self, task: Hashable) -> int:
if task not in self.start_window or self.start_window[task][0] is None:
return 0
else:
return self.start_window[task][0]
[docs]
def get_ub_start_window(self, task: Hashable) -> int:
if task not in self.start_window or self.start_window[task][1] is None:
return self.horizon - self.tasks_data[task].duration_task
else:
return self.start_window[task][1]
[docs]
def get_lb_end_window(self, task: Hashable) -> int:
if task not in self.end_window or self.end_window[task][0] is None:
return self.get_lb_start_window(task) + self.tasks_data[task].duration_task
else:
return self.end_window[task][0]
[docs]
def get_ub_end_window(self, task: Hashable) -> int:
if task not in self.end_window or self.end_window[task][1] is None:
return self.get_ub_start_window(task) + self.tasks_data[task].duration_task
else:
return self.end_window[task][1]
[docs]
def get_all_lb_ub(self) -> list[tuple[int, int, int, int]]:
"""
Return a list of
(lb_start, ub_start, lb_end, ub_end) for each task
(lower/upper bound on start, lower/upper bound on end)
"""
return [
(
int(self.get_lb_start_window(t)),
int(self.get_ub_start_window(t)),
int(self.get_lb_end_window(t)),
int(self.get_ub_end_window(t)),
)
for t in self.tasks_list
]
[docs]
def get_unavailable_teams_per_activity(self) -> dict[Hashable, set[Hashable]]:
all_teams = set(self.team_names)
return {
t: all_teams.difference(self.available_team_for_activity[t])
for t in self.available_team_for_activity
}
[docs]
def compatible_teams_all_activity(self) -> dict[Hashable, set[Hashable]]:
all_teams = set(self.team_names)
d = {t: all_teams for t in self.tasks_list}
for t in self.available_team_for_activity:
d[t] = self.available_team_for_activity[t]
return d
[docs]
def compatible_teams_index_all_activity(self) -> dict[int, set[int]]:
all_teams = set(self.index_to_team.keys())
d = {index_task: all_teams for index_task in self.index_to_task}
for t in self.available_team_for_activity:
d[self.tasks_to_index[t]] = {
self.teams_to_index[team]
for team in self.available_team_for_activity[t]
}
return d
[docs]
def compute_unavailability_calendar(self, team: Hashable) -> list[tuple[int, int]]:
# Compute the "complement" calendar of the availability calendar.
cur_time = 0
list_unavailable = []
for i in range(len(self.calendar_team[team])):
if self.calendar_team[team][i][0] > cur_time:
list_unavailable.append((cur_time, self.calendar_team[team][i][0]))
cur_time = self.calendar_team[team][i][1]
if cur_time < self.horizon:
list_unavailable.append((cur_time, self.horizon))
return list_unavailable
[docs]
def evaluate_solution(
solution: AllocSchedulingSolution, problem: AllocSchedulingProblem
) -> dict[str, float]:
dur_per_team = {}
teams_used = set()
nb_not_done = 0
for i in range(len(solution.allocation)):
team = solution.allocation[i]
if team is None or team == -1: # convention to be respected.
nb_not_done += 1
else:
teams_used.add(team)
if team not in dur_per_team:
dur_per_team[team] = 0
dur_per_team[team] += problem.tasks_data[problem.index_to_task[i]].duration_task
makespan = max(solution.schedule[:, 1])
sat = satisfy_detailed(problem=problem, solution=solution)
return {
"nb_teams": len(teams_used),
"nb_not_done": nb_not_done,
"makespan": makespan,
"workload_dispersion": max(dur_per_team.values()) - min(dur_per_team.values()),
"nb_violations": len(sat),
}
[docs]
def satisfy_all_done(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
partial_solution: bool = False,
):
nb_not_done = 0
for i in range(len(solution.allocation)):
team = solution.allocation[i]
if team is None or team == -1 or np.isnan(team): # convention to be respected.
nb_not_done += 1
return nb_not_done == 0
[docs]
def satisfy_precedence(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
partial_solution: bool = False,
) -> bool:
"""
Partial solution = True means we ignore variable set to None when we check the constraint.
"""
for task in problem.precedence_constraints:
index_task = problem.tasks_to_index[task]
end_task = solution.schedule[index_task, 1]
if partial_solution and np.isnan(end_task):
continue
elif np.isnan(end_task):
return False
for successor_task in problem.precedence_constraints[task]:
if partial_solution and np.isnan(
solution.schedule[problem.tasks_to_index[successor_task], 0]
):
continue
if solution.schedule[problem.tasks_to_index[successor_task], 0] < end_task:
logging.info("Precedence not respected")
return False
return True
[docs]
def satisfy_same_allocation(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
partial_solution: bool = False,
) -> bool:
"""
Partial solution = True means we ignore variable set to None when we check the constraint.
This one is a bit tricky to write when we allow partial solution.
"""
for set_same_alloc in problem.same_allocation:
if not partial_solution:
one_ac = next(iter(set_same_alloc))
val = solution.allocation[problem.tasks_to_index[one_ac]]
if any(
solution.allocation[problem.tasks_to_index[x]] != val
for x in set_same_alloc
):
logging.info("Same alloc not respected")
return False
else:
one_ac = next(
(
s
for s in set_same_alloc
if not np.isnan(solution.allocation[problem.tasks_to_index[s]])
),
None,
)
if one_ac is None:
continue
else:
val = solution.allocation[problem.tasks_to_index[one_ac]]
if any(
solution.allocation[problem.tasks_to_index[x]] != val
for x in set_same_alloc
if not np.isnan(solution.allocation[problem.tasks_to_index[x]])
):
return False
return True
[docs]
def satisfy_available_team(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
partial_solution: bool = False,
) -> bool:
"""
Partial solution = True means we ignore variable set to None when we check the constraint.
"""
for activity in problem.available_team_for_activity:
alloc = solution.allocation[problem.tasks_to_index[activity]]
if partial_solution and (alloc is None or np.isnan(alloc)):
continue
if np.isnan(alloc):
logging.info("Team available not respected")
return False
if alloc == -1:
return False
team_alloc = problem.index_to_team[alloc]
if team_alloc not in problem.available_team_for_activity[activity]:
logging.info("Team available not respected")
return False
return True
[docs]
def realign_calendars(calendars_dict: dict[Hashable, list[tuple[int, int]]]):
new_cals = {}
# logger.debug(calendars_dict)
for t in calendars_dict:
new_cal = []
if len(calendars_dict[t]) <= 1:
new_cals[t] = calendars_dict[t]
continue
else:
current_ = (calendars_dict[t][0][0], calendars_dict[t][0][1])
current_end = current_[1]
current_index = 1
while current_index < len(calendars_dict[t]):
if calendars_dict[t][current_index][0] == current_end:
current_ = (current_[0], calendars_dict[t][current_index][1])
current_end = calendars_dict[t][current_index][1]
else:
new_cal.append(current_)
current_ = calendars_dict[t][current_index]
current_end = current_[1]
current_index += 1
if len(new_cal) == 0 or new_cal[-1] != current_:
new_cal.append(current_)
new_cals[t] = new_cal
# logger.debug(new_cals)
return new_cals
[docs]
def intervals_do_not_overlap(
interval1: tuple[float, float], interval2: tuple[float, float]
):
if interval1[1] <= interval2[0] or interval2[1] <= interval1[0]:
return True
else:
return False
[docs]
def interval_inside(
interval1: tuple[float, float], interval_container: tuple[float, float]
):
if interval1[0] >= interval_container[0] and interval1[1] <= interval_container[1]:
return True
return False
[docs]
def satisfy_calendars(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
partial_solution: bool = False,
):
for i in range(len(solution.allocation)):
val = solution.allocation[i]
if np.isnan(val) or val not in problem.index_to_team:
# This case is tackled by some other function
continue
st, end = solution.schedule[i, :]
calendar = problem.calendar_team[problem.index_to_team[val]]
if not any(interval_inside((st, end), x) for x in calendar):
# logger.info(f"Calendar constraint not respected for task {i}")
# logger.info(f"here's (availability) calendar and the task to schedule {calendar, (st, end)}")
return False
return True
[docs]
def satisfy_time_window(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
partial_solution: bool = False,
):
for t in range(solution.schedule.shape[0]):
tsk = problem.tasks_list[t]
lb_s, ub_s = problem.start_window.get(tsk, (None, None))
lb_e, ub_e = problem.end_window.get(tsk, (None, None))
st, end = solution.schedule[t, :]
if lb_s is not None:
if st < lb_s:
##logger.info(f"Task {t} starting before {lb_s}")
return False
if ub_s is not None:
if st > ub_s:
##logger.info(f"Task {t} starting after {ub_s}")
return False
if lb_e is not None:
if end < lb_e:
##logger.info(f"Task {t} ending before {lb_e}")
return False
if ub_e is not None:
if end > ub_e:
##logger.info(f"Task {t} ending after {ub_e}")
return False
return True
[docs]
def full_satisfy(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
partial_solution: bool = False,
) -> bool:
is_satisfied = True
for func in [
satisfy_all_done,
satisfy_precedence,
satisfy_available_team,
satisfy_same_allocation,
satisfy_time_window,
satisfy_calendars,
satisfy_overlap_teams,
]:
if not func(
problem=problem, solution=solution, partial_solution=partial_solution
):
logger.warning(func, " not satisfied !!")
is_satisfied = False
return is_satisfied
[docs]
def satisfy_detailed(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
):
# TODO : detail computation of calendar constraint violated.
return reduce(
lambda x, y: x + y(problem, solution),
[
satisfy_detailed_all_done,
satisfy_detailed_precedence,
satisfy_detailed_same_allocation,
satisfy_detailed_available_team,
satisfy_time_window_detailed,
satisfy_overlap_teams_detailed,
],
[],
)
[docs]
def satisfy_detailed_all_done(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
):
violations = []
for i in range(len(solution.allocation)):
team = solution.allocation[i]
if team is None or team == -1 or np.isnan(team): # convention to be respected.
violations.append(({"task_index": i, "tag": "is_not_done"}))
return violations
[docs]
def satisfy_time_window_detailed(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
):
violations = []
for t in range(solution.schedule.shape[0]):
tsk = problem.tasks_list[t]
lb_s, ub_s = problem.start_window.get(tsk, (None, None))
lb_e, ub_e = problem.end_window.get(tsk, (None, None))
st, end = solution.schedule[t, :]
if solution.allocation[t] not in problem.index_to_team:
continue
if lb_s is not None:
if st < lb_s:
###logger.info(f"Task {t} starting before {lb_s}")
violations.append(
{
"task_index": t,
"start": st,
"expected": lb_s,
"tag": "early",
"violation": lb_s - st,
}
)
if ub_s is not None:
if st > ub_s:
###logger.info(f"Task {t} starting after {ub_s}")
violations.append(
{
"task_index": t,
"start": st,
"expected": ub_s,
"tag": "late",
"violation": st - ub_s,
}
)
if lb_e is not None:
if end < lb_e:
###logger.info(f"Task {t} ending before {lb_e}")
violations.append(
{
"task_index": t,
"end": end,
"expected": lb_e,
"tag": "early",
"violation": lb_e - end,
}
)
if ub_e is not None:
if end > ub_e:
###logger.info(f"Task {t} ending after {ub_e}")
violations.append(
{
"task_index": t,
"end": end,
"expected": ub_e,
"tag": "late",
"violation": end - ub_e,
}
)
return violations
[docs]
def satisfy_detailed_precedence(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
) -> list[tuple[str, Hashable, Hashable, int]]:
list_violated_precedence_constraint = []
for task in problem.precedence_constraints:
index_task = problem.tasks_to_index[task]
end_task = solution.schedule[index_task, 1]
if solution.allocation[index_task] == -1 or np.isnan(
solution.allocation[index_task]
):
continue
for successor_task in problem.precedence_constraints[task]:
index_succ = problem.tasks_to_index[successor_task]
if solution.allocation[index_succ] == -1 or np.isnan(
solution.allocation[index_succ]
):
continue
if solution.schedule[problem.tasks_to_index[successor_task], 0] < end_task:
list_violated_precedence_constraint += [
(
"precedence",
task,
successor_task,
index_task,
problem.tasks_to_index[successor_task],
end_task
- solution.schedule[problem.tasks_to_index[successor_task], 0],
end_task,
solution.schedule[problem.tasks_to_index[successor_task], 0],
)
]
return list_violated_precedence_constraint
[docs]
def satisfy_detailed_same_allocation(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
) -> list[tuple[str, set[Hashable]], set[int]]:
list_violated_same_allocation_constraint: list[
tuple[str, set[Hashable]], set[int]
] = []
for set_same_alloc in problem.same_allocation:
one_ac = next(iter(set_same_alloc))
val = solution.allocation[problem.tasks_to_index[one_ac]]
if np.isnan(val) or val == -1:
continue
if any(
(
solution.allocation[problem.tasks_to_index[x]] != val
and solution.allocation[problem.tasks_to_index[x]]
in problem.index_to_team
)
for x in set_same_alloc
):
list_violated_same_allocation_constraint += [
(
"same_allocation",
set_same_alloc,
{problem.tasks_to_index[i] for i in set_same_alloc},
)
]
return list_violated_same_allocation_constraint
[docs]
def satisfy_detailed_available_team(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
) -> list[tuple[str, Hashable, Hashable, int, int]]:
list_violated_available_team = []
for activity in problem.available_team_for_activity:
team: int = int(solution.allocation[problem.tasks_to_index[activity]])
if team not in problem.index_to_team:
continue
team_alloc = problem.index_to_team[team]
if team_alloc not in problem.available_team_for_activity[activity]:
list_violated_available_team += [
(
"available-team",
activity,
team_alloc,
problem.tasks_to_index[activity],
problem.teams_to_index[team_alloc],
)
]
return list_violated_available_team
[docs]
def satisfy_overlap_teams(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution, **kwargs
) -> bool:
teams = set(solution.allocation)
for team in teams:
if team not in problem.index_to_team:
continue
ac_team = np.nonzero(solution.allocation == team)
schedule_ = solution.schedule[ac_team, :]
slots = sorted([tuple(x) for x in schedule_])
for j in range(1, len(slots)):
if slots[j][0] < slots[j - 1][1]:
return False
return True
[docs]
def satisfy_overlap_teams_detailed(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
) -> list[tuple[str, int, int, int]]:
teams = set(solution.allocation)
list_violated_overlap = []
for team in teams:
if team not in problem.index_to_team:
continue
ac_team = np.nonzero(solution.allocation == team)
schedule_ = solution.schedule[ac_team[0], :]
slots = sorted([tuple(x) for x in schedule_])
for j in range(1, len(slots)):
if slots[j][0] < slots[j - 1][1]:
list_violated_overlap.append(
(
"no-overlap",
int(ac_team[0][j - 1]),
int(ac_team[0][j]),
int(team),
)
)
return list_violated_overlap
[docs]
def compute_stats_per_team(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
) -> dict[int, float]:
teams = set(solution.allocation)
used_time_by_team = {}
for team in teams:
if team not in problem.index_to_team:
continue
slots = problem.compute_unavailability_calendar(problem.index_to_team[team])
logger.debug("slots", slots)
ac_team = np.nonzero(solution.allocation == team)
schedule_ = solution.schedule[ac_team[0], :]
used_time = np.sum(schedule_[:, 1] - schedule_[:, 0])
used_time += sum([x[1] - x[0] for x in slots])
used_time_by_team[team] = used_time
return used_time_by_team
[docs]
def build_calendar_array_from_availability_slot(
availability_slots: list[tuple[int, int]], horizon: int, value: int = 1
):
array = np.zeros(horizon, dtype=int)
for slot in availability_slots:
array[slot[0] : min(slot[1], horizon)] = value
return array
[docs]
def build_pair_mode_constraint(
problem: AllocSchedulingProblem, rcpsp: RcpspProblem, use_score: bool = False
):
modes_allowed_assignment: dict[
tuple[Hashable, Hashable], list[tuple[Hashable, Hashable]]
] = {}
if problem.same_allocation is not None:
for set_task in problem.same_allocation:
for p in itertools.combinations(set_task, 2):
list_modes = []
team_to_mode_p0 = {}
team_to_mode_p1 = {}
for mode in rcpsp.mode_details[p[0]]:
team = [
x
for x in rcpsp.mode_details[p[0]][mode]
if x != "duration" and rcpsp.mode_details[p[0]][mode][x] > 0
][0]
team_to_mode_p0[team] = mode
for mode in rcpsp.mode_details[p[1]]:
team = [
x
for x in rcpsp.mode_details[p[1]][mode]
if x != "duration" and rcpsp.mode_details[p[1]][mode][x] > 0
][0]
team_to_mode_p1[team] = mode
teams_0 = set(team_to_mode_p0.keys())
teams_1 = set(team_to_mode_p1.keys())
intersect = teams_0.intersection(teams_1)
for team in intersect:
list_modes += [(team_to_mode_p0[team], team_to_mode_p1[team])]
modes_allowed_assignment[(p[0], p[1])] = list_modes
task_mode_integer = {}
index_team = {
rcpsp.resources_list[i]: i + 1 for i in range(len(rcpsp.resources_list))
}
for task in rcpsp.mode_details:
for mode in rcpsp.mode_details[task]:
team = [
x
for x in rcpsp.mode_details[task][mode]
if x != "duration"
if rcpsp.mode_details[task][mode][x] > 0
]
if len(team) > 0:
team = team[0]
task_mode_integer[(task, mode)] = index_team[team]
else:
continue
return PairModeConstraint(
allowed_mode_assignment=modes_allowed_assignment if not use_score else None,
same_score_mode=set(modes_allowed_assignment.keys()) if use_score else None,
score_mode=task_mode_integer if use_score else None,
)
else:
return None
[docs]
def correct_schedule_avoid_overlap(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
init_min_starting_date_lb: bool = False,
):
routings = []
for i in range(problem.number_teams):
tasks = np.nonzero(solution.allocation == i)
if len(tasks[0]) > 0:
sorted_ = tasks[0][np.argsort(solution.schedule[tasks, 0])][0]
routings.append(sorted_)
else:
routings.append([])
used_teams = {i for i in range(len(routings)) if len(routings[i]) > 0}
calendar_teams = {
i: build_calendar_array_from_availability_slot(
availability_slots=problem.calendar_team[problem.index_to_team[i]],
horizon=problem.horizon,
value=1,
)
for i in used_teams
}
predecessors = {j: set() for j in problem.index_to_task}
for t in problem.precedence_constraints:
for succ_t in problem.precedence_constraints[t]:
predecessors[problem.tasks_to_index[succ_t]].add(problem.tasks_to_index[t])
scheduled = set()
min_starting_time = {
i: max(
solution.schedule[i, 0],
int(problem.get_lb_start_window(problem.index_to_task[i])),
)
for i in problem.index_to_task
}
if init_min_starting_date_lb:
min_starting_time = {
i: int(problem.get_lb_start_window(problem.index_to_task[i]))
for i in problem.index_to_task
}
nb_tasks_to_sched = len(problem.index_to_task)
sorted_tasks = [int(x) for x in np.argsort(solution.schedule[:, 0])]
schedule_per_team = {i: [] for i in used_teams}
new_schedule = np.zeros(solution.schedule.shape)
while len(scheduled) < nb_tasks_to_sched:
next_t = next(
t
for t in sorted_tasks
if t not in scheduled and all(p in scheduled for p in predecessors[t])
)
task_name = problem.index_to_task[next_t]
sched = solution.schedule[next_t, :]
team = solution.allocation[next_t]
if team == -1:
new_schedule[next_t, 0] = solution.schedule[next_t, 0]
new_schedule[next_t, 1] = solution.schedule[next_t, 1]
scheduled.add(next_t)
continue
dur = problem.tasks_data[task_name].duration_task
if len(schedule_per_team[team]) == 0:
schedule_per_team[team].append((next_t, sched[0], sched[1]))
calendar_teams[team][sched[0] : sched[1]] = 0
else:
t = 0
last_time = schedule_per_team[team][-1][-1]
min_time = max(
last_time + t,
max(
min_starting_time[next_t],
int(problem.get_lb_start_window(problem.index_to_task[next_t])),
),
)
if np.min(calendar_teams[team][min_time : min_time + dur]) >= 1:
schedule_per_team[team].append((next_t, min_time, min_time + dur))
calendar_teams[team][min_time : min_time + dur] = 0
else:
time_ = next(
t
for t in range(min_time, problem.horizon)
if np.min(calendar_teams[team][t : t + dur]) == 1
)
schedule_per_team[team].append((next_t, time_, time_ + dur))
calendar_teams[team][time_ : time_ + dur] = 0
if problem.index_to_task[next_t] in problem.precedence_constraints:
for t in problem.precedence_constraints[problem.index_to_task[next_t]]:
min_starting_time[problem.tasks_to_index[t]] = max(
min_starting_time[problem.tasks_to_index[t]],
schedule_per_team[team][-1][-1],
)
new_schedule[next_t, 0] = schedule_per_team[team][-1][1]
new_schedule[next_t, 1] = schedule_per_team[team][-1][2]
scheduled.add(next_t)
return AllocSchedulingSolution(
problem=problem, schedule=new_schedule, allocation=solution.allocation
)
[docs]
def export_scheduling_problem_json(problem: AllocSchedulingProblem) -> dict:
d = dict()
d["teams"] = problem.team_names
d["tasks"] = [str(x) for x in problem.tasks_list]
d["calendar"] = problem.calendar_team
for t in d["calendar"]:
d["calendar"][t] = [(int(x[0]), int(x[1])) for x in d["calendar"][t]]
d["teams_to_index"] = problem.teams_to_index
d["tasks_data"] = {
int(t): {"duration": problem.tasks_data[t].duration_task}
for t in problem.tasks_data
}
d["same_allocation"] = [[str(y) for y in x] for x in problem.same_allocation]
d["compatible_teams"] = {
str(t): list(problem.available_team_for_activity[t])
for t in problem.available_team_for_activity
}
d["start_window"] = {str(t): problem.start_window[t] for t in problem.start_window}
d["end_window"] = {str(t): problem.end_window[t] for t in problem.end_window}
d["successors"] = {
str(t): [str(succ) for succ in problem.precedence_constraints[t]]
for t in problem.precedence_constraints
}
d["horizon_shift"] = int(problem.horizon_start_shift)
d["original_start"] = {
int(x): problem.original_start[x] for x in problem.original_start
}
d["original_end"] = {int(x): problem.original_end[x] for x in problem.original_end}
return d