Source code for discrete_optimization.workforce.scheduling.utils

#  Copyright (c) 2025 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import datetime
import logging
import os
from copy import deepcopy
from typing import Any, Hashable, Union

import matplotlib.patches as patches
import networkx as nx
import numpy as np
from matplotlib import pyplot as plt

from discrete_optimization.generic_tools.do_problem import TypeObjective
from discrete_optimization.workforce.allocation.problem import (
    AggregateOperator,
    AllocationAdditionalConstraint,
    ObjectiveDoc,
    TeamAllocationProblem,
    TeamAllocationProblemMultiobj,
    TeamAllocationSolution,
)
from discrete_optimization.workforce.scheduling.problem import (
    AllocSchedulingProblem,
    AllocSchedulingSolution,
    TasksDescription,
    satisfy_detailed,
)

try:
    import plotly.graph_objects as go
except:
    pass

logger = logging.getLogger(__name__)

this_folder = os.path.dirname(os.path.abspath(__file__))


[docs] def overlap_interval(interval_1: tuple[int, int], interval_2: tuple[int, int]): return interval_1[0] < interval_2[1] and interval_2[0] < interval_1[1]
[docs] def compute_equivalent_teams_scheduling_problem( scheduling_problem: AllocSchedulingProblem, ) -> list[list[int]]: """ Return a list of disjoint set of teams index, that can be considered as indistinguishable from a solution point of view. Example : in the pure coloring problem all the colors/team are equivalent In the team allocation problem, due to restricted compatible teams to task, the equivalent class are different Adaptation from the notebook/test_models.ipynb """ allowed_teams = [ { scheduling_problem.teams_to_index[team] for team in scheduling_problem.available_team_for_activity.get( t, scheduling_problem.team_names ) } for t in scheduling_problem.tasks_list ] equiv_teams = {} all_teams = set(scheduling_problem.index_to_team) # for each team t, compute the intersection of teams that can do the same task as t. for t in scheduling_problem.index_to_team: equiv_teams[t] = all_teams.intersection(*[s for s in allowed_teams if t in s]) equiv_teams[t] = equiv_teams[t].intersection( { s for s in equiv_teams[t] if scheduling_problem.calendar_team[scheduling_problem.index_to_team[s]] == scheduling_problem.calendar_team[scheduling_problem.index_to_team[t]] } ) for t in equiv_teams: for et in list(equiv_teams[t]): if t not in equiv_teams[et]: equiv_teams[t].remove(et) all_symm_teams = set([frozenset(equiv_teams[t]) for t in equiv_teams]) symm_groups = [list(sorted(group)) for group in all_symm_teams] return symm_groups
[docs] def compute_changes_between_solution( solution_a: AllocSchedulingSolution, solution_b: AllocSchedulingSolution, problem_a: AllocSchedulingProblem = None, problem_b: AllocSchedulingProblem = None, ): if problem_a is None: problem_a = solution_a.problem if problem_b is None: problem_b = solution_b.problem if problem_a == problem_b: return compute_changes_between_solution_same_pb( solution_a, solution_b, problem=problem_a ) common_activities = set(problem_a.tasks_list).intersection(problem_b.tasks_list) activities = [a for a in common_activities] alloc_a = np.array( [solution_a.allocation[problem_a.tasks_to_index[a]] for a in activities] ) alloc_b = np.array( [solution_b.allocation[problem_b.tasks_to_index[a]] for a in activities] ) schedule_a = np.array( [solution_a.schedule[problem_a.tasks_to_index[a], :] for a in activities] ) schedule_b = np.array( [solution_b.schedule[problem_b.tasks_to_index[a], :] for a in activities] ) reallocated = (alloc_a != alloc_b).nonzero() shifted = (schedule_a[:, 0] != schedule_b[:, 0]).nonzero() nb_shifted = shifted[0].shape[0] delta = schedule_a - schedule_b abs_delta = np.abs(delta) if shifted[0].shape[0] > 0: mean_shift = np.mean(abs_delta[shifted, 0]) sum_shift = np.sum(abs_delta[shifted, 0]) max_shift = np.max(abs_delta[shifted, 0]) else: mean_shift = 0 max_shift = 0 sum_shift = 0 # abs_delta = np.abs(delta) details = { "allocs": (alloc_a, alloc_b), "schedules": (schedule_a, schedule_b), "reallocated_index": reallocated[0], "reallocated_tasks": [activities[i] for i in reallocated[0]], "nb_reallocated": reallocated[0].shape[0], "shifted_index": shifted[0], "shifted_tasks": [activities[i] for i in shifted[0]], "shifts": delta[shifted, 0], "nb_shift": nb_shifted, "mean_shift": mean_shift, "sum_shift": sum_shift, "max_shift": max_shift, } return details
[docs] def compute_changes_between_solution_same_pb( solution_a: AllocSchedulingSolution, solution_b: AllocSchedulingSolution, problem: AllocSchedulingProblem = None, ): if problem is None: problem = solution_a.problem reallocated = (solution_a.allocation != solution_b.allocation).nonzero() shifted = (solution_a.schedule[:, 0] != solution_b.schedule[:, 0]).nonzero() nb_reallocated = reallocated[0].shape[0] nb_shifted = shifted[0].shape[0] delta = solution_a.schedule - solution_b.schedule abs_delta = np.abs(delta) if shifted[0].shape[0] > 0: mean_shift = np.mean(abs_delta[shifted, 0]) max_shift = np.max(abs_delta[shifted, 0]) sum_shift = np.sum(abs_delta[shifted, 0]) else: mean_shift = 0 max_shift = 0 sum_shift = 0 details = { "allocs": (solution_a.allocation, solution_b.allocation), "schedules": (solution_a.schedule, solution_b.schedule), "reallocated_index": reallocated[0], "reallocated_tasks": [problem.tasks_list[i] for i in reallocated[0]], "nb_reallocated": reallocated[0].shape[0], "shifted_index": shifted[0], "shifted_tasks": [problem.tasks_list[i] for i in shifted[0]], "shifts": delta[shifted, 0], "nb_shift": nb_shifted, "sum_shift": sum_shift, "mean_shift": mean_shift, "max_shift": max_shift, } return details
[docs] def plot_schedule_comparison( base_solution: AllocSchedulingSolution, updated_solution: AllocSchedulingSolution, problem: AllocSchedulingProblem, ): """ Nice visu to compare 2 schedules. """ base_allocation = base_solution.allocation base_schedule = base_solution.schedule updated_allocation = updated_solution.allocation updated_schedule = updated_solution.schedule # Create a color map for teams cmap = plt.get_cmap( "tab20", problem.number_teams ) # Using tab20 to assign unique colors for up to 20 teams colors = [cmap(i) for i in range(problem.number_teams)] fig, ax = plt.subplots(figsize=(12, 8)) all_teams = set(base_allocation).union(updated_allocation) sorted_all_teams = sorted(list(all_teams)) sorted_all_teams = [int(x) for x in sorted_all_teams] if -1 in sorted_all_teams: sorted_all_teams = sorted_all_teams[1:] + ["Unset"] team_to_index = {sorted_all_teams[i]: i for i in range(len(sorted_all_teams))} # Iterate over each task for i in range(len(base_allocation)): team_base = int(base_allocation[i]) team_updated = int(updated_allocation[i]) if team_base == -1: team_base = "Unset" if team_updated == -1: team_updated = "Unset" start_base, end_base = base_schedule[i] start_updated, end_updated = updated_schedule[i] # Plot updated solution as a rectangle rect_updated = patches.Rectangle( (start_updated, team_to_index[team_updated] - 0.4), end_updated - start_updated, 0.8, facecolor=colors[team_to_index[team_updated]], edgecolor="black", lw=2, ) # linestyle='--') ax.add_patch(rect_updated) # Draw an arrow from the end of the base task to the start of the updated task if (int(start_updated), team_updated) != (int(start_base), team_base): # Plot base solution as a rectangle rect_base = patches.Rectangle( (start_base, team_to_index[team_base] - 0.4), end_base - start_base, 0.8, facecolor=colors[team_to_index[team_base]], edgecolor="black", lw=2, alpha=0.1, label=f"Team {team_base}" if i == 0 else "", ) ax.add_patch(rect_base) ax.annotate( "", xy=((start_updated + end_updated) / 2, team_to_index[team_updated]), xytext=((start_base + end_base) / 2, team_to_index[team_base]), arrowprops=dict(arrowstyle="->", color="black", lw=2), ) # Customizing the plot ax.set_xlabel("Time") ax.set_ylabel("Teams") # min_alloc = min(base_allocation.min(), updated_allocation.min()) # max_alloc = max(base_allocation.max(), updated_allocation.max()) ax.set_yticks(range(len(sorted_all_teams))) # np.arange(min_alloc, max_alloc+1, 1)) ax.set_yticklabels( [f"Team {sorted_all_teams[i]}" for i in range(len(sorted_all_teams))] ) ax.set_title("Schedule Comparison (Base vs Updated)") # Set limits to ensure the rectangles fit well ax.set_xlim( min(base_schedule[:, 0].min(), updated_schedule[:, 0].min()) - 1, max(base_schedule[:, 1].max(), updated_schedule[:, 1].max()) + 1, ) ax.set_ylim(-0.5, len(sorted_all_teams) - 0.5) # Show the grid for clarity ax.grid(True, which="both", axis="x", linestyle="--", lw=0.5) return fig
[docs] def plotly_schedule_comparison( base_solution: AllocSchedulingSolution, updated_solution: AllocSchedulingSolution, problem: AllocSchedulingProblem, index_team_to_other_index: dict[int, int] = None, display: bool = False, additional_info: dict[Hashable, dict[str, Any]] = None, use_color_scale: bool = True, use_color_map_per_task: bool = False, color_map_per_task: dict[int, Any] = None, opacity_map_per_task: dict[int, float] = None, show_all_changes: bool = True, show_change: dict[int, bool] = None, plot_team_breaks: bool = False, plot_xticks: bool = True, plot_text: bool = True, title="Scheduling Comparison (Base vs Updated)", ): """ Nice visu to compare 2 schedules. """ if opacity_map_per_task is None: opacity_map_per_task = {} if index_team_to_other_index is None: index_team_to_other_index = {i: i for i in problem.index_to_team} base_allocation = base_solution.allocation max_time = np.max(base_solution.schedule[:, 1]) base_schedule = base_solution.schedule + problem.horizon_start_shift // 60 updated_schedule = updated_solution.schedule + problem.horizon_start_shift // 60 min_ = np.min(base_schedule) base_schedule = base_schedule # -min_ updated_allocation = list(updated_solution.allocation) for i in range(len(base_allocation)): if base_allocation[i] is None: base_allocation[i] = -1 if updated_allocation[i] is None: updated_allocation[i] = -1 import plotly.colors as pc colormap = "Viridis" value = 0.5 # Example: Normalized float value between 0 and 1 # Extract the color for the given value using the colormap fig = go.Figure() all_teams = set(base_allocation).union(updated_allocation) sorted_all_teams = sorted(list(all_teams)) sorted_all_teams = [int(x) for x in sorted_all_teams] indexing = deepcopy(index_team_to_other_index) indexing[-1] = -1 sorted_all_teams = sorted([indexing[x] for x in sorted_all_teams]) if -1 in sorted_all_teams: sorted_all_teams = sorted_all_teams[1:] + ["Unset"] team_to_index = {sorted_all_teams[i]: i for i in range(len(sorted_all_teams))} # Iterate over each task if use_color_scale: colors_ = pc.sample_colorscale( colormap, np.linspace(0, 1, len(sorted_all_teams)) ) # `value` is between 0 and 1 else: colors_ = ["green" for i in range(len(sorted_all_teams))] for i in range(len(base_allocation)): team_base = indexing[int(base_allocation[i])] team_updated = indexing[int(updated_allocation[i])] if team_base == -1: team_base = "Unset" if team_updated == -1: team_updated = "Unset" start_base, end_base = base_schedule[i] start_updated, end_updated = updated_schedule[i] start_base_ts = datetime.datetime.fromtimestamp(start_base * 60) end_base_ts = datetime.datetime.fromtimestamp(end_base * 60) start_updated_ts = datetime.datetime.fromtimestamp(start_updated * 60) end_updated_ts = datetime.datetime.fromtimestamp(end_updated * 60) start_updated_ts = np.datetime64(int(start_updated * 60), "s") end_updated_ts = np.datetime64(int(end_updated * 60), "s") # Plot updated solution as a rectangle hover = { "activity": problem.index_to_task[i], "activity_index": i, "index_team": team_updated, "start": start_updated_ts, "end": end_updated_ts, } if additional_info is not None: if problem.index_to_task[i] in additional_info: for key in additional_info[problem.index_to_task[i]]: hover[key] = additional_info[problem.index_to_task[i]][key] hover_template = "" for key in hover: hover_template += f"<b>{key}:</b> {hover[key]}<br>" if "ACTIVITY TYPE" in hover: hover_template = "" for key in [ "ACTIVITY TYPE", "start", "end", "START LOCATION NAME", "DESTINATION LOCATION NAME", "Ressourcenname", ]: hover_template += f"<b>{key}:</b> {hover[key]}<br>" fig.add_trace( go.Bar( x=[(end_updated - start_updated)], y=[team_to_index[team_updated]], base=start_updated, orientation="h", name=f"{i}", text=i if plot_text else "", textangle=0, # textfont=dict(size=15, color="black"), # textposition="outside", marker=dict( color=colors_[team_to_index[team_updated]] if not use_color_map_per_task else color_map_per_task.get( i, "green" ), # Normalize team index for color mapping # colorscale="rainbow", # Use the custom color scale showscale=False, # No color scale bar line=dict(color="black", width=1.5), opacity=opacity_map_per_task.get(i, 0.8), ), # marker_color=[float(team_to_index[team_updated] / len(team_to_index))], # marker_colorscale="rainbow", hovertemplate=hover_template, ) ) # Draw an arrow from the end of the base task to the start of the updated task if (int(start_updated), team_updated) != (int(start_base), team_base): if not show_all_changes: if not show_change.get(i, True): continue # Plot base solution as a rectangle hover = { "activity": problem.index_to_task[i], "activity_index": i, "index_team": team_base, } hover_template = "<b>Base schedule</b> <br>" for key in hover: hover_template += f"<b>{key}:</b> {hover[key]}<br>" fig.add_trace( go.Bar( x=[(end_base - start_base)], y=[team_to_index[team_base]], base=start_base, orientation="h", name=f"{i}", # marker_color=[float(team_to_index[team_base]/len(team_to_index))], # marker_colorscale="rainbow", marker=dict( color=colors_[team_to_index[team_base]] if not use_color_map_per_task else color_map_per_task.get(i, "green"), # colorscale="rainbow", opacity=0.3, ), hovertemplate=hover_template, ) ) # fig.add_annotation( # ax=(start_base+end_base)//2, # ay=team_to_index[team_base], # x=(start_updated+end_updated)//2, # y=team_to_index[team_updated], # xref="x", # yref="y", # axref="x", # ayref="y", # showarrow=True, # arrowhead=1, # # arrowhead=6, # Larger arrowhead # arrowsize=1, # Increase size of the arrow # # arrowwidth=3, # Thicker arrow line # arrowcolor="darkred", # High-contrast arrow color # opacity=0.8 # Fully opaque arrow # ) fig.add_trace( go.Scatter( x=[ (start_base + end_base) // 2, (start_updated + end_updated) // 2, ], y=[team_to_index[team_base], team_to_index[team_updated]], mode="lines+markers", line=dict(color="darkred", width=1, dash="dot"), # Dotted line marker=dict( size=[0, 7], color="darkred" ), # , symbol="arrow-bar-up") ) ) if plot_team_breaks: for team in team_to_index: if team != "Unset": team_ = problem.team_names[team] slots = problem.compute_unavailability_calendar(team_) for slot in slots: if slot[0] < max_time: right_side = min(slot[1], max_time + 30) logger.debug("Slot ", slot) fig.add_trace( go.Bar( x=[(right_side - slot[0])], y=[team_to_index[team]], dy=0.2, base=slot[0] + problem.horizon_start_shift // 60, orientation="h", # name=f"Break", # marker_color=[float(team_to_index[team_base]/len(team_to_index))], # marker_colorscale="rainbow", marker=dict(color="black", opacity=0.7), hovertemplate=f"<b>Break {team}</b> <br>", width=0.2, ) ) min_date = int(np.min(base_schedule)) max_date = int(np.max(base_schedule)) tickvals = range(min_date, max_date, (max_date - min_date) // 10) # ticktext = [datetime.datetime.fromtimestamp(ts*60).strftime('%Y-%m-%d %H:%M:%S') for ts in tickvals] ticktext = [str(np.datetime64(ts * 60, "s")) for ts in tickvals] fig.update_layout( title=title, xaxis_title="Timeline", xaxis=dict( title="Time", type="linear", # Keep the scale as numeric tickvals=list(tickvals), # Numeric positions for ticks ticktext=ticktext if plot_xticks else None, # Custom labels for the ticks showticklabels=plot_xticks # tickformat="%Y-%m-%d %H:%M:%S", # Format tick labels as datetime ), yaxis=dict( title="Team", tickmode="array", tickvals=list(range(len(sorted_all_teams))), ticktext=[f"{sorted_all_teams[i]}" for i in range(len(sorted_all_teams))], ), barmode="overlay", height=400, margin=dict(l=20, r=20, t=40, b=20), showlegend=False, ) # Show line on each teams fig.update_yaxes(showgrid=True) # Vertical line stuff fig.update_xaxes( showgrid=False, showspikes=True, spikemode="across", spikesnap="cursor", spikecolor="gray", spikedash="solid", spikethickness=0.5, showline=True, ) if display: fig.show() return fig
[docs] def estimate_nb_resource_needed(problem: AllocSchedulingProblem): horizon = problem.horizon estimated_workload = np.zeros(horizon) lb_ub = problem.get_all_lb_ub() for i in problem.index_to_task: dur = problem.tasks_data[problem.index_to_task[i]].duration_task rng = lb_ub[i][3] - lb_ub[i][0] estimated_workload[lb_ub[i][0] : lb_ub[i][3]] += dur / rng return estimated_workload
[docs] def template_violated_constraint( satisfy_detailed_output: Union[tuple, dict], problem: AllocSchedulingProblem ): if isinstance(satisfy_detailed_output, dict): if "tag" in satisfy_detailed_output: tag = satisfy_detailed_output["tag"] if tag == "is_not_done": task = satisfy_detailed_output["task_index"] return "Task" + f" {task} " + "is not done" if tag == "early": task = satisfy_detailed_output["task_index"] if "start" in satisfy_detailed_output: return ( f"Task {task} is early, starts at {satisfy_detailed_output['start']} " f"but expected at least {satisfy_detailed_output['expected']}" ) if "end" in satisfy_detailed_output: return ( f"Task {task} is early, ends at {satisfy_detailed_output['end']} " f"but expected at least {satisfy_detailed_output['expected']}" ) if tag == "late": task = satisfy_detailed_output["task_index"] if "start" in satisfy_detailed_output: return ( f"{task} is late, starts at {satisfy_detailed_output['start']} " f"but expected at most {satisfy_detailed_output['expected']}" ) if "end" in satisfy_detailed_output: return ( f"{task} is late, ends at {satisfy_detailed_output['end']} " f"but expected at most {satisfy_detailed_output['expected']}" ) if isinstance(satisfy_detailed_output, tuple): if satisfy_detailed_output[0] == "precedence": succ = satisfy_detailed_output[4] pred = satisfy_detailed_output[3] return f"{pred} should be done before {succ}, it's not the case : end of {pred} is {satisfy_detailed_output[-2]}, and start of {succ} is {satisfy_detailed_output[-1]}" if satisfy_detailed_output[0] == "same_allocation": return f"Same allocation constraint is not satisfied for tasks {satisfy_detailed_output[-1]}" if satisfy_detailed_output[0] == "available-team": activity = satisfy_detailed_output[1] teams = { problem.teams_to_index[x] for x in problem.available_team_for_activity[activity] } return ( f"Task {satisfy_detailed_output[3]} is allocated to {satisfy_detailed_output[4]} " f"while only teams {teams} can do this task" ) if satisfy_detailed_output[0] == "no-overlap": return ( f"Task {satisfy_detailed_output[1]} and {satisfy_detailed_output[2]} " f"overlaps on team {satisfy_detailed_output[3]}" )
[docs] def natural_explanation_unsat( detailed_output: list[Union[tuple, dict]], problem: AllocSchedulingProblem ) -> list[str]: return [template_violated_constraint(x, problem=problem) for x in detailed_output]
[docs] def natural_explanation_unsat_from_sol(solution: AllocSchedulingSolution) -> list[str]: return natural_explanation_unsat( detailed_output=satisfy_detailed(problem=solution.problem, solution=solution), problem=solution.problem, )
[docs] def compute_precedence_graph(problem: AllocSchedulingProblem) -> nx.DiGraph: graph = nx.DiGraph() compatible_teams = problem.compatible_teams_index_all_activity() for index_t in problem.index_to_task: graph.add_node( index_t, activity_name=problem.index_to_task[index_t], compatible_teams=compatible_teams[index_t], ) for task in problem.precedence_constraints: index = problem.tasks_to_index[task] for succ in problem.precedence_constraints[task]: index_succ = problem.tasks_to_index[succ] graph.add_edge(index, index_succ) return graph
[docs] def export_scheduling_problem_json(problem: AllocSchedulingProblem) -> dict: d = dict() d["teams"] = problem.team_names d["tasks"] = [str(x) for x in problem.tasks_list] d["calendar"] = problem.calendar_team for t in d["calendar"]: d["calendar"][t] = [(int(x[0]), int(x[1])) for x in d["calendar"][t]] d["teams_to_index"] = problem.teams_to_index d["tasks_data"] = { int(t): {"duration": problem.tasks_data[t].duration_task} for t in problem.tasks_data } d["same_allocation"] = [[str(y) for y in x] for x in problem.same_allocation] d["compatible_teams"] = { str(t): list(problem.available_team_for_activity[t]) for t in problem.available_team_for_activity } d["start_window"] = {str(t): problem.start_window[t] for t in problem.start_window} d["end_window"] = {str(t): problem.end_window[t] for t in problem.end_window} d["successors"] = { str(t): [str(succ) for succ in problem.precedence_constraints[t]] for t in problem.precedence_constraints } return d
[docs] def get_working_time_teams(problem: AllocSchedulingProblem) -> dict: work_time = {} lb_ub = problem.get_all_lb_ub() true_ub = max([x[-1] for x in lb_ub]) for team in problem.calendar_team: cumul = 0 for slots in problem.calendar_team[team]: if slots[0] <= true_ub: cumul += max(0, min(slots[1], true_ub) - slots[0]) work_time[team] = cumul return work_time
[docs] def compute_available_teams_per_activities_alloc_problem( problem: TeamAllocationProblem, starts: np.ndarray, ends: np.ndarray, calendars_team: dict[Hashable, np.ndarray], ): available_team_per_activity = {} for i in range(len(starts)): available_team_per_activity[problem.activities_name[i]] = set() st, end = starts[i], ends[i] for team in calendars_team: if st == end: if calendars_team[team][int(st)] == 1: available_team_per_activity[problem.activities_name[i]].add(team) if np.min(calendars_team[team][int(st) : int(end)]) == 1: available_team_per_activity[problem.activities_name[i]].add(team) return available_team_per_activity
[docs] def build_allocation_problem_from_scheduling( problem: AllocSchedulingProblem, solution: AllocSchedulingSolution = None, problem_alloc: TeamAllocationProblem = None, multiobjective: bool = True, ) -> TeamAllocationProblem: calendars_team = problem.calendar_team # min_d = min([calendars_team[team][0][0] for team in calendars_team if len(calendars_team[team]) >= 1]) # max_d = max([calendars_team[team][-1][1] for team in calendars_team if len(calendars_team[team]) >= 1]) calendars_array = { team: np.zeros((problem.horizon + 20)) for team in calendars_team } for team in calendars_team: for min_, max_ in calendars_team[team]: calendars_array[team][min_ : min(max_, calendars_array[team].shape[0])] = 1 if solution is not None: starts = solution.schedule[:, 0] ends = solution.schedule[:, 1] else: starts = np.array([problem.original_start[t] for t in problem.tasks_list]) ends = np.array([problem.original_end[t] for t in problem.tasks_list]) shift = problem.horizon_start_shift # UPDATE GRAPH ACTIVITY if problem_alloc is None: if not multiobjective: problem_alloc = TeamAllocationProblem( allocation_additional_constraint=AllocationAdditionalConstraint( same_allocation=problem.same_allocation, allowed_allocation=problem.available_team_for_activity, ), calendar_team=calendars_team, schedule_activity={ t: ( starts[problem.tasks_to_index[t]], ends[problem.tasks_to_index[t]], ) for t in problem.original_start }, activities_name=problem.tasks_list, ) else: problem_alloc = TeamAllocationProblemMultiobj( allocation_additional_constraint=AllocationAdditionalConstraint( same_allocation=problem.same_allocation, allowed_allocation=problem.available_team_for_activity, ), attributes_cumul_activities=["duration"], objective_doc_cumul_activities={ "duration": ( ObjectiveDoc(type=TypeObjective.PENALTY, default_weight=-1), AggregateOperator.MAX_MINUS_MIN, ) }, calendar_team=calendars_team, schedule_activity={ t: ( starts[problem.tasks_to_index[t]], ends[problem.tasks_to_index[t]], ) for t in problem.original_start }, activities_name=problem.tasks_list, ) return problem_alloc
[docs] def build_scheduling_problem_from_allocation( problem: TeamAllocationProblem, horizon_start_shift: int = 0 ) -> AllocSchedulingProblem: d = {} if problem.allocation_additional_constraint is not None: d["same_allocation"] = problem.allocation_additional_constraint.same_allocation return AllocSchedulingProblem( team_names=problem.teams_name, calendar_team=problem.calendar_team, horizon=10000, horizon_start_shift=horizon_start_shift, tasks_list=problem.activities_name, tasks_data={ t: TasksDescription( duration_task=problem.graph_activity.nodes_infos_dict[t]["duration"] ) for t in problem.activities_name }, same_allocation=problem.allocation_additional_constraint.same_allocation, precedence_constraints={}, available_team_for_activity={}, start_window={}, end_window={}, original_start={ t: problem.graph_activity.nodes_infos_dict[t]["start"] for t in problem.activities_name }, original_end={ t: problem.graph_activity.nodes_infos_dict[t]["end"] for t in problem.activities_name }, )
[docs] def alloc_solution_to_alloc_sched_solution( problem: AllocSchedulingProblem, solution: TeamAllocationSolution ): alloc_problem: TeamAllocationProblem = solution.problem new_alloc = -np.ones(len(solution.allocation), dtype=int) schedule = np.zeros((len(solution.allocation), 2), dtype=int) for task in problem.tasks_list: ind_alloc = alloc_problem.index_activities_name[task] alloc = solution.allocation[ind_alloc] if alloc is not None and alloc >= 0: new_alloc[problem.tasks_to_index[task]] = problem.teams_to_index[ alloc_problem.teams_name[alloc] ] schedule[problem.tasks_to_index[task], 0] = problem.original_start[task] schedule[problem.tasks_to_index[task], 1] = problem.original_end[task] return AllocSchedulingSolution( problem=problem, schedule=schedule, allocation=new_alloc )
[docs] def binary_calendar(list_available: list[tuple[int, int]], horizon: int): calendars_array = np.zeros((horizon + 20)) for min_, max_ in list_available: calendars_array[min_ : min(max_, calendars_array.shape[0])] = 1 return calendars_array
[docs] def get_availability_slots(calendar_matrix: np.ndarray): availability_slots = [] start_slot = None n = len(calendar_matrix) for i in range(n): if ( calendar_matrix[i] == 1 and start_slot is None ): # Start of an available block start_slot = i elif ( calendar_matrix[i] == 0 and start_slot is not None ): # End of an available block availability_slots.append((start_slot, i - 1)) start_slot = None if start_slot is not None: # If the array ends with an available block availability_slots.append((start_slot, n - 1)) return availability_slots