# Copyright (c) 2025 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import itertools
import logging
from collections.abc import Hashable
from functools import reduce
from typing import Optional
import networkx as nx
import numpy as np
from discrete_optimization.generic_tools.do_problem import (
EncodingRegister,
ModeOptim,
ObjectiveDoc,
ObjectiveHandling,
ObjectiveRegister,
Problem,
Solution,
TypeObjective,
)
from discrete_optimization.rcpsp.problem import (
PairModeConstraint,
RcpspProblem,
RcpspSolution,
SpecialConstraintsDescription,
)
logger = logging.getLogger(__name__)
[docs]
class AllocSchedulingSolution(Solution):
def __init__(
self,
problem: "AllocSchedulingProblem",
schedule: np.ndarray,
allocation: np.ndarray,
):
self.problem = problem
self.schedule = schedule
self.allocation = allocation
[docs]
def copy(self) -> "Solution":
return AllocSchedulingSolution(
problem=self.problem,
schedule=np.copy(self.schedule),
allocation=np.copy(self.allocation),
)
[docs]
def change_problem(self, new_problem: "Problem") -> None:
self.problem = new_problem
[docs]
class TasksDescription:
def __init__(self, duration_task: int, resource_consumption: dict[str, int] = None):
self.duration_task = duration_task
self.resource_consumption = resource_consumption
if self.resource_consumption is None:
self.resource_consumption = {}
[docs]
class AllocSchedulingProblem(Problem):
def __init__(
self,
team_names: list[Hashable],
calendar_team: dict[
Hashable, list[tuple[int, int]]
], # List of available slots per team.
horizon: int,
tasks_list: list[Hashable],
tasks_data: dict[Hashable, TasksDescription],
same_allocation: list[set[Hashable]],
precedence_constraints: dict[Hashable, set[Hashable]],
available_team_for_activity: dict[Hashable, set[Hashable]],
start_window: dict[Hashable, tuple[Optional[int], Optional[int]]],
end_window: dict[Hashable, tuple[Optional[int], Optional[int]]],
original_start: dict[Hashable, int],
original_end: dict[Hashable, int],
resources_list: list[str] = None,
resources_capacity: dict[str, int] = None,
horizon_start_shift: Optional[int] = 0,
objective_handling: ObjectiveHandling = ObjectiveHandling.AGGREGATE,
):
self.team_names = team_names
self.calendar_team = realign_calendars(calendar_team)
self.horizon = horizon
self.tasks_list = tasks_list
self.tasks_data = tasks_data
self.same_allocation = same_allocation
self.precedence_constraints = precedence_constraints
self.available_team_for_activity = available_team_for_activity
self.start_window = start_window
self.end_window = end_window
self.original_start = original_start
self.original_end = original_end
self.number_teams = len(self.team_names)
self.number_tasks = len(self.tasks_list)
self.teams_to_index = {self.team_names[i]: i for i in range(self.number_teams)}
self.index_to_team = {i: self.team_names[i] for i in range(self.number_teams)}
self.tasks_to_index = {self.tasks_list[i]: i for i in range(self.number_tasks)}
self.index_to_task = {
self.tasks_to_index[key]: key for key in self.tasks_to_index
}
self.rcpsp, self.ac_mode_to_team = transform_to_multimode_rcpsp(
self,
build_calendar=True,
add_window_time_constraint=True,
add_additional_constraint=True,
)
self.objective_handling = objective_handling
self.resources_list = resources_list
self.resources_capacity = resources_capacity
self.horizon_start_shift = horizon_start_shift
[docs]
def set_objective_handling(self, objective_handling: ObjectiveHandling):
self.objective_handling = objective_handling
[docs]
def update_available_team_for_activity(
self, available_team_for_activity: dict[Hashable, set[Hashable]]
):
"""Override the available team attribute and update the rcpsp accordingly"""
self.available_team_for_activity = available_team_for_activity
self.rcpsp, self.ac_mode_to_team = transform_to_multimode_rcpsp(
self,
build_calendar=True,
add_window_time_constraint=True,
add_additional_constraint=True,
)
[docs]
def compute_predecessors(self):
self.successors = self.precedence_constraints
self.predecessors = {}
for t in self.precedence_constraints:
for succ in self.precedence_constraints[t]:
if succ not in self.predecessors:
self.predecessors[succ] = set()
self.predecessors[succ].add(t)
[docs]
def evaluate(self, variable: Solution) -> dict[str, float]:
return evaluate_solution(solution=variable, problem=self)
[docs]
def satisfy(self, variable: Solution) -> bool:
if isinstance(variable, RcpspSolution):
return self.rcpsp.satisfy(variable)
if isinstance(variable, AllocSchedulingSolution):
return full_satisfy(problem=self, solution=variable, partial_solution=False)
# Deprecated version was transforming the solution into an RCPSPSolution.
# return self.rcpsp.satisfy(transform_alloc_solution_to_rcpsp_solution(alloc_solution=variable,
# rcpsp_problem=self.rcpsp,
# ac_mode_to_team=self.ac_mode_to_team,
# alloc_scheduling_problem=self))
[docs]
def get_attribute_register(self) -> EncodingRegister:
pass
[docs]
def get_solution_type(self) -> type[Solution]:
return AllocSchedulingSolution
[docs]
def get_objective_register(self) -> ObjectiveRegister:
dict_objective = {
"nb_not_done": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-100000
),
"nb_teams": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-10000.0
),
"makespan": ObjectiveDoc(type=TypeObjective.PENALTY, default_weight=-1.0),
"workload_dispersion": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-1.0
),
"nb_violations": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-100.0
),
}
return ObjectiveRegister(
objective_sense=ModeOptim.MAXIMIZATION,
objective_handling=self.objective_handling,
dict_objective_to_doc=dict_objective,
)
[docs]
def get_lb_start_window(self, task: Hashable) -> int:
if task not in self.start_window or self.start_window[task][0] is None:
return 0
else:
return self.start_window[task][0]
[docs]
def get_ub_start_window(self, task: Hashable) -> int:
if task not in self.start_window or self.start_window[task][1] is None:
return self.horizon - self.tasks_data[task].duration_task
else:
return self.start_window[task][1]
[docs]
def get_lb_end_window(self, task: Hashable) -> int:
if task not in self.end_window or self.end_window[task][0] is None:
return self.get_lb_start_window(task) + self.tasks_data[task].duration_task
else:
return self.end_window[task][0]
[docs]
def get_ub_end_window(self, task: Hashable) -> int:
if task not in self.end_window or self.end_window[task][1] is None:
return self.get_ub_start_window(task) + self.tasks_data[task].duration_task
else:
return self.end_window[task][1]
[docs]
def get_all_lb_ub(self) -> list[tuple[int, int, int, int]]:
"""
Return a list of
(lb_start, ub_start, lb_end, ub_end) for each task
(lower/upper bound on start, lower/upper bound on end)
"""
return [
(
int(self.get_lb_start_window(t)),
int(self.get_ub_start_window(t)),
int(self.get_lb_end_window(t)),
int(self.get_ub_end_window(t)),
)
for t in self.tasks_list
]
[docs]
def get_unavailable_teams_per_activity(self) -> dict[Hashable, set[Hashable]]:
all_teams = set(self.team_names)
return {
t: all_teams.difference(self.available_team_for_activity[t])
for t in self.available_team_for_activity
}
[docs]
def compatible_teams_all_activity(self) -> dict[Hashable, set[Hashable]]:
all_teams = set(self.team_names)
d = {t: all_teams for t in self.tasks_list}
for t in self.available_team_for_activity:
d[t] = self.available_team_for_activity[t]
return d
[docs]
def compatible_teams_index_all_activity(self) -> dict[int, set[int]]:
all_teams = set(self.index_to_team.keys())
d = {index_task: all_teams for index_task in self.index_to_task}
for t in self.available_team_for_activity:
d[self.tasks_to_index[t]] = {
self.teams_to_index[team]
for team in self.available_team_for_activity[t]
}
return d
[docs]
def compute_unavailability_calendar(self, team: Hashable) -> list[tuple[int, int]]:
# Compute the "complement" calendar of the availability calendar.
cur_time = 0
list_unavailable = []
for i in range(len(self.calendar_team[team])):
if self.calendar_team[team][i][0] > cur_time:
list_unavailable.append((cur_time, self.calendar_team[team][i][0]))
cur_time = self.calendar_team[team][i][1]
if cur_time < self.horizon:
list_unavailable.append((cur_time, self.horizon))
return list_unavailable
[docs]
def evaluate_solution(
solution: AllocSchedulingSolution, problem: AllocSchedulingProblem
) -> dict[str, float]:
dur_per_team = {}
teams_used = set()
nb_not_done = 0
for i in range(len(solution.allocation)):
team = solution.allocation[i]
if team is None or team == -1: # convention to be respected.
nb_not_done += 1
else:
teams_used.add(team)
if team not in dur_per_team:
dur_per_team[team] = 0
dur_per_team[team] += problem.tasks_data[problem.index_to_task[i]].duration_task
makespan = max(solution.schedule[:, 1])
sat = satisfy_detailed(problem=problem, solution=solution)
return {
"nb_teams": len(teams_used),
"nb_not_done": nb_not_done,
"makespan": makespan,
"workload_dispersion": max(dur_per_team.values()) - min(dur_per_team.values()),
"nb_violations": len(sat),
}
[docs]
def satisfy_all_done(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
partial_solution: bool = False,
):
nb_not_done = 0
for i in range(len(solution.allocation)):
team = solution.allocation[i]
if team is None or team == -1 or np.isnan(team): # convention to be respected.
nb_not_done += 1
return nb_not_done == 0
[docs]
def satisfy_precedence(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
partial_solution: bool = False,
) -> bool:
"""
Partial solution = True means we ignore variable set to None when we check the constraint.
"""
for task in problem.precedence_constraints:
index_task = problem.tasks_to_index[task]
end_task = solution.schedule[index_task, 1]
if partial_solution and np.isnan(end_task):
continue
elif np.isnan(end_task):
return False
for successor_task in problem.precedence_constraints[task]:
if partial_solution and np.isnan(
solution.schedule[problem.tasks_to_index[successor_task], 0]
):
continue
if solution.schedule[problem.tasks_to_index[successor_task], 0] < end_task:
logging.info("Precedence not respected")
return False
return True
[docs]
def satisfy_same_allocation(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
partial_solution: bool = False,
) -> bool:
"""
Partial solution = True means we ignore variable set to None when we check the constraint.
This one is a bit tricky to write when we allow partial solution.
"""
for set_same_alloc in problem.same_allocation:
if not partial_solution:
one_ac = next(iter(set_same_alloc))
val = solution.allocation[problem.tasks_to_index[one_ac]]
if any(
solution.allocation[problem.tasks_to_index[x]] != val
for x in set_same_alloc
):
logging.info("Same alloc not respected")
return False
else:
one_ac = next(
(
s
for s in set_same_alloc
if not np.isnan(solution.allocation[problem.tasks_to_index[s]])
),
None,
)
if one_ac is None:
continue
else:
val = solution.allocation[problem.tasks_to_index[one_ac]]
if any(
solution.allocation[problem.tasks_to_index[x]] != val
for x in set_same_alloc
if not np.isnan(solution.allocation[problem.tasks_to_index[x]])
):
return False
return True
[docs]
def satisfy_available_team(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
partial_solution: bool = False,
) -> bool:
"""
Partial solution = True means we ignore variable set to None when we check the constraint.
"""
for activity in problem.available_team_for_activity:
alloc = solution.allocation[problem.tasks_to_index[activity]]
if partial_solution and (alloc is None or np.isnan(alloc)):
continue
if np.isnan(alloc):
logging.info("Team available not respected")
return False
if alloc == -1:
return False
team_alloc = problem.index_to_team[alloc]
if team_alloc not in problem.available_team_for_activity[activity]:
logging.info("Team available not respected")
return False
return True
[docs]
def realign_calendars(calendars_dict: dict[Hashable, list[tuple[int, int]]]):
new_cals = {}
# logger.debug(calendars_dict)
for t in calendars_dict:
new_cal = []
if len(calendars_dict[t]) <= 1:
new_cals[t] = calendars_dict[t]
continue
else:
current_ = (calendars_dict[t][0][0], calendars_dict[t][0][1])
current_end = current_[1]
current_index = 1
while current_index < len(calendars_dict[t]):
if calendars_dict[t][current_index][0] == current_end:
current_ = (current_[0], calendars_dict[t][current_index][1])
current_end = calendars_dict[t][current_index][1]
else:
new_cal.append(current_)
current_ = calendars_dict[t][current_index]
current_end = current_[1]
current_index += 1
if len(new_cal) == 0 or new_cal[-1] != current_:
new_cal.append(current_)
new_cals[t] = new_cal
# logger.debug(new_cals)
return new_cals
[docs]
def intervals_do_not_overlap(
interval1: tuple[float, float], interval2: tuple[float, float]
):
if interval1[1] <= interval2[0] or interval2[1] <= interval1[0]:
return True
else:
return False
[docs]
def interval_inside(
interval1: tuple[float, float], interval_container: tuple[float, float]
):
if interval1[0] >= interval_container[0] and interval1[1] <= interval_container[1]:
return True
return False
[docs]
def satisfy_calendars(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
partial_solution: bool = False,
):
for i in range(len(solution.allocation)):
val = solution.allocation[i]
if np.isnan(val) or val not in problem.index_to_team:
# This case is tackled by some other function
continue
st, end = solution.schedule[i, :]
calendar = problem.calendar_team[problem.index_to_team[val]]
if not any(interval_inside((st, end), x) for x in calendar):
# logger.info(f"Calendar constraint not respected for task {i}")
# logger.info(f"here's (availability) calendar and the task to schedule {calendar, (st, end)}")
return False
return True
[docs]
def satisfy_time_window(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
partial_solution: bool = False,
):
for t in range(solution.schedule.shape[0]):
tsk = problem.tasks_list[t]
lb_s, ub_s = problem.start_window.get(tsk, (None, None))
lb_e, ub_e = problem.end_window.get(tsk, (None, None))
st, end = solution.schedule[t, :]
if lb_s is not None:
if st < lb_s:
##logger.info(f"Task {t} starting before {lb_s}")
return False
if ub_s is not None:
if st > ub_s:
##logger.info(f"Task {t} starting after {ub_s}")
return False
if lb_e is not None:
if end < lb_e:
##logger.info(f"Task {t} ending before {lb_e}")
return False
if ub_e is not None:
if end > ub_e:
##logger.info(f"Task {t} ending after {ub_e}")
return False
return True
[docs]
def full_satisfy(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
partial_solution: bool = False,
) -> bool:
for func in [
satisfy_all_done,
satisfy_precedence,
satisfy_available_team,
satisfy_same_allocation,
satisfy_time_window,
satisfy_calendars,
satisfy_overlap_teams,
]:
if not func(
problem=problem, solution=solution, partial_solution=partial_solution
):
logger.warning(func, " not satisfied !!")
return all(
func(problem=problem, solution=solution, partial_solution=partial_solution)
for func in [
satisfy_all_done,
satisfy_precedence,
satisfy_available_team,
satisfy_same_allocation,
satisfy_time_window,
satisfy_calendars,
satisfy_overlap_teams,
]
)
[docs]
def satisfy_detailed(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
):
# TODO : detail computation of calendar constraint violated.
return reduce(
lambda x, y: x + y(problem, solution),
[
satisfy_detailed_all_done,
satisfy_detailed_precedence,
satisfy_detailed_same_allocation,
satisfy_detailed_available_team,
satisfy_time_window_detailed,
satisfy_overlap_teams_detailed,
],
[],
)
[docs]
def satisfy_detailed_all_done(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
):
violations = []
for i in range(len(solution.allocation)):
team = solution.allocation[i]
if team is None or team == -1 or np.isnan(team): # convention to be respected.
violations.append(({"task_index": i, "tag": "is_not_done"}))
return violations
[docs]
def satisfy_time_window_detailed(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
):
violations = []
for t in range(solution.schedule.shape[0]):
tsk = problem.tasks_list[t]
lb_s, ub_s = problem.start_window.get(tsk, (None, None))
lb_e, ub_e = problem.end_window.get(tsk, (None, None))
st, end = solution.schedule[t, :]
if solution.allocation[t] not in problem.index_to_team:
continue
if lb_s is not None:
if st < lb_s:
###logger.info(f"Task {t} starting before {lb_s}")
violations.append(
{
"task_index": t,
"start": st,
"expected": lb_s,
"tag": "early",
"violation": lb_s - st,
}
)
if ub_s is not None:
if st > ub_s:
###logger.info(f"Task {t} starting after {ub_s}")
violations.append(
{
"task_index": t,
"start": st,
"expected": ub_s,
"tag": "late",
"violation": st - ub_s,
}
)
if lb_e is not None:
if end < lb_e:
###logger.info(f"Task {t} ending before {lb_e}")
violations.append(
{
"task_index": t,
"end": end,
"expected": lb_e,
"tag": "early",
"violation": lb_e - end,
}
)
if ub_e is not None:
if end > ub_e:
###logger.info(f"Task {t} ending after {ub_e}")
violations.append(
{
"task_index": t,
"end": end,
"expected": ub_e,
"tag": "late",
"violation": end - ub_e,
}
)
return violations
[docs]
def satisfy_detailed_precedence(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
) -> list[tuple[str, Hashable, Hashable, int]]:
list_violated_precedence_constraint = []
for task in problem.precedence_constraints:
index_task = problem.tasks_to_index[task]
end_task = solution.schedule[index_task, 1]
if solution.allocation[index_task] == -1 or np.isnan(
solution.allocation[index_task]
):
continue
for successor_task in problem.precedence_constraints[task]:
index_succ = problem.tasks_to_index[successor_task]
if solution.allocation[index_succ] == -1 or np.isnan(
solution.allocation[index_succ]
):
continue
if solution.schedule[problem.tasks_to_index[successor_task], 0] < end_task:
list_violated_precedence_constraint += [
(
"precedence",
task,
successor_task,
index_task,
problem.tasks_to_index[successor_task],
end_task
- solution.schedule[problem.tasks_to_index[successor_task], 0],
end_task,
solution.schedule[problem.tasks_to_index[successor_task], 0],
)
]
return list_violated_precedence_constraint
[docs]
def satisfy_detailed_same_allocation(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
) -> list[tuple[str, set[Hashable]], set[int]]:
list_violated_same_allocation_constraint: list[
tuple[str, set[Hashable]], set[int]
] = []
for set_same_alloc in problem.same_allocation:
one_ac = next(iter(set_same_alloc))
val = solution.allocation[problem.tasks_to_index[one_ac]]
if np.isnan(val) or val == -1:
continue
if any(
(
solution.allocation[problem.tasks_to_index[x]] != val
and solution.allocation[problem.tasks_to_index[x]]
in problem.index_to_team
)
for x in set_same_alloc
):
list_violated_same_allocation_constraint += [
(
"same_allocation",
set_same_alloc,
{problem.tasks_to_index[i] for i in set_same_alloc},
)
]
return list_violated_same_allocation_constraint
[docs]
def satisfy_detailed_available_team(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
) -> list[tuple[str, Hashable, Hashable, int, int]]:
list_violated_available_team = []
for activity in problem.available_team_for_activity:
team: int = int(solution.allocation[problem.tasks_to_index[activity]])
if team not in problem.index_to_team:
continue
team_alloc = problem.index_to_team[team]
if team_alloc not in problem.available_team_for_activity[activity]:
list_violated_available_team += [
(
"available-team",
activity,
team_alloc,
problem.tasks_to_index[activity],
problem.teams_to_index[team_alloc],
)
]
return list_violated_available_team
[docs]
def satisfy_overlap_teams(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution, **kwargs
) -> bool:
teams = set(solution.allocation)
for team in teams:
if team not in problem.index_to_team:
continue
ac_team = np.nonzero(solution.allocation == team)
schedule_ = solution.schedule[ac_team, :]
slots = sorted([tuple(x) for x in schedule_])
for j in range(1, len(slots)):
if slots[j][0] < slots[j - 1][1]:
return False
return True
[docs]
def satisfy_overlap_teams_detailed(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
) -> list[tuple[str, int, int, int]]:
teams = set(solution.allocation)
list_violated_overlap = []
for team in teams:
if team not in problem.index_to_team:
continue
ac_team = np.nonzero(solution.allocation == team)
schedule_ = solution.schedule[ac_team[0], :]
slots = sorted([tuple(x) for x in schedule_])
for j in range(1, len(slots)):
if slots[j][0] < slots[j - 1][1]:
list_violated_overlap.append(
(
"no-overlap",
int(ac_team[0][j - 1]),
int(ac_team[0][j]),
int(team),
)
)
return list_violated_overlap
[docs]
def compute_stats_per_team(
problem: AllocSchedulingProblem, solution: AllocSchedulingSolution
) -> dict[int, float]:
teams = set(solution.allocation)
used_time_by_team = {}
for team in teams:
if team not in problem.index_to_team:
continue
slots = problem.compute_unavailability_calendar(problem.index_to_team[team])
logger.debug("slots", slots)
ac_team = np.nonzero(solution.allocation == team)
schedule_ = solution.schedule[ac_team[0], :]
used_time = np.sum(schedule_[:, 1] - schedule_[:, 0])
used_time += sum([x[1] - x[0] for x in slots])
used_time_by_team[team] = used_time
return used_time_by_team
[docs]
def build_calendar_array_from_availability_slot(
availability_slots: list[tuple[int, int]], horizon: int, value: int = 1
):
array = np.zeros(horizon, dtype=int)
for slot in availability_slots:
array[slot[0] : min(slot[1], horizon)] = value
return array
[docs]
def build_pair_mode_constraint(
problem: AllocSchedulingProblem, rcpsp: RcpspProblem, use_score: bool = False
):
modes_allowed_assignment: dict[
tuple[Hashable, Hashable], list[tuple[Hashable, Hashable]]
] = {}
if problem.same_allocation is not None:
for set_task in problem.same_allocation:
for p in itertools.combinations(set_task, 2):
list_modes = []
team_to_mode_p0 = {}
team_to_mode_p1 = {}
for mode in rcpsp.mode_details[p[0]]:
team = [
x
for x in rcpsp.mode_details[p[0]][mode]
if x != "duration" and rcpsp.mode_details[p[0]][mode][x] > 0
][0]
team_to_mode_p0[team] = mode
for mode in rcpsp.mode_details[p[1]]:
team = [
x
for x in rcpsp.mode_details[p[1]][mode]
if x != "duration" and rcpsp.mode_details[p[1]][mode][x] > 0
][0]
team_to_mode_p1[team] = mode
teams_0 = set(team_to_mode_p0.keys())
teams_1 = set(team_to_mode_p1.keys())
intersect = teams_0.intersection(teams_1)
for team in intersect:
list_modes += [(team_to_mode_p0[team], team_to_mode_p1[team])]
modes_allowed_assignment[(p[0], p[1])] = list_modes
task_mode_integer = {}
index_team = {
rcpsp.resources_list[i]: i + 1 for i in range(len(rcpsp.resources_list))
}
for task in rcpsp.mode_details:
for mode in rcpsp.mode_details[task]:
team = [
x
for x in rcpsp.mode_details[task][mode]
if x != "duration"
if rcpsp.mode_details[task][mode][x] > 0
]
if len(team) > 0:
team = team[0]
task_mode_integer[(task, mode)] = index_team[team]
else:
continue
return PairModeConstraint(
allowed_mode_assignment=modes_allowed_assignment if not use_score else None,
same_score_mode=set(modes_allowed_assignment.keys()) if use_score else None,
score_mode=task_mode_integer if use_score else None,
)
else:
return None
[docs]
def correct_schedule_avoid_overlap(
problem: AllocSchedulingProblem,
solution: AllocSchedulingSolution,
init_min_starting_date_lb: bool = False,
):
routings = []
for i in range(problem.number_teams):
tasks = np.nonzero(solution.allocation == i)
if len(tasks[0]) > 0:
sorted_ = tasks[0][np.argsort(solution.schedule[tasks, 0])][0]
routings.append(sorted_)
else:
routings.append([])
used_teams = {i for i in range(len(routings)) if len(routings[i]) > 0}
calendar_teams = {
i: build_calendar_array_from_availability_slot(
availability_slots=problem.calendar_team[problem.index_to_team[i]],
horizon=problem.horizon,
value=1,
)
for i in used_teams
}
predecessors = {j: set() for j in problem.index_to_task}
for t in problem.precedence_constraints:
for succ_t in problem.precedence_constraints[t]:
predecessors[problem.tasks_to_index[succ_t]].add(problem.tasks_to_index[t])
scheduled = set()
min_starting_time = {
i: max(
solution.schedule[i, 0],
int(problem.get_lb_start_window(problem.index_to_task[i])),
)
for i in problem.index_to_task
}
if init_min_starting_date_lb:
min_starting_time = {
i: int(problem.get_lb_start_window(problem.index_to_task[i]))
for i in problem.index_to_task
}
nb_tasks_to_sched = len(problem.index_to_task)
sorted_tasks = [int(x) for x in np.argsort(solution.schedule[:, 0])]
schedule_per_team = {i: [] for i in used_teams}
new_schedule = np.zeros(solution.schedule.shape)
while len(scheduled) < nb_tasks_to_sched:
next_t = next(
t
for t in sorted_tasks
if t not in scheduled and all(p in scheduled for p in predecessors[t])
)
task_name = problem.index_to_task[next_t]
sched = solution.schedule[next_t, :]
team = solution.allocation[next_t]
if team == -1:
new_schedule[next_t, 0] = solution.schedule[next_t, 0]
new_schedule[next_t, 1] = solution.schedule[next_t, 1]
scheduled.add(next_t)
continue
dur = problem.tasks_data[task_name].duration_task
if len(schedule_per_team[team]) == 0:
schedule_per_team[team].append((next_t, sched[0], sched[1]))
calendar_teams[team][sched[0] : sched[1]] = 0
else:
t = 0
last_time = schedule_per_team[team][-1][-1]
min_time = max(
last_time + t,
max(
min_starting_time[next_t],
int(problem.get_lb_start_window(problem.index_to_task[next_t])),
),
)
if np.min(calendar_teams[team][min_time : min_time + dur]) >= 1:
schedule_per_team[team].append((next_t, min_time, min_time + dur))
calendar_teams[team][min_time : min_time + dur] = 0
else:
time_ = next(
t
for t in range(min_time, problem.horizon)
if np.min(calendar_teams[team][t : t + dur]) == 1
)
schedule_per_team[team].append((next_t, time_, time_ + dur))
calendar_teams[team][time_ : time_ + dur] = 0
if problem.index_to_task[next_t] in problem.precedence_constraints:
for t in problem.precedence_constraints[problem.index_to_task[next_t]]:
min_starting_time[problem.tasks_to_index[t]] = max(
min_starting_time[problem.tasks_to_index[t]],
schedule_per_team[team][-1][-1],
)
new_schedule[next_t, 0] = schedule_per_team[team][-1][1]
new_schedule[next_t, 1] = schedule_per_team[team][-1][2]
scheduled.add(next_t)
return AllocSchedulingSolution(
problem=problem, schedule=new_schedule, allocation=solution.allocation
)
[docs]
def export_scheduling_problem_json(problem: AllocSchedulingProblem) -> dict:
d = dict()
d["teams"] = problem.team_names
d["tasks"] = [str(x) for x in problem.tasks_list]
d["calendar"] = problem.calendar_team
for t in d["calendar"]:
d["calendar"][t] = [(int(x[0]), int(x[1])) for x in d["calendar"][t]]
d["teams_to_index"] = problem.teams_to_index
d["tasks_data"] = {
int(t): {"duration": problem.tasks_data[t].duration_task}
for t in problem.tasks_data
}
d["same_allocation"] = [[str(y) for y in x] for x in problem.same_allocation]
d["compatible_teams"] = {
str(t): list(problem.available_team_for_activity[t])
for t in problem.available_team_for_activity
}
d["start_window"] = {str(t): problem.start_window[t] for t in problem.start_window}
d["end_window"] = {str(t): problem.end_window[t] for t in problem.end_window}
d["successors"] = {
str(t): [str(succ) for succ in problem.precedence_constraints[t]]
for t in problem.precedence_constraints
}
d["horizon_shift"] = int(problem.horizon_start_shift)
d["original_start"] = {
int(x): problem.original_start[x] for x in problem.original_start
}
d["original_end"] = {int(x): problem.original_end[x] for x in problem.original_end}
return d