Source code for discrete_optimization.rcpsp.utils

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

from __future__ import (
    annotations,  # make annotations be considered as string by default
)

import logging
from collections.abc import Hashable, Sequence
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Optional, Union

import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import scipy.stats
from matplotlib.collections import PatchCollection
from matplotlib.patches import Polygon as pp
from shapely.geometry import Polygon

from discrete_optimization.generic_tools.graph_api import Graph
from discrete_optimization.generic_tools.plot_utils import (
    get_cmap,
    get_cmap_with_nb_colors,
)
from discrete_optimization.rcpsp.problem_preemptive import PreemptiveRcpspSolution

if TYPE_CHECKING:  # avoid circular imports due to annotations
    from discrete_optimization.rcpsp.problem import RcpspProblem
    from discrete_optimization.rcpsp.solution import RcpspSolution

logger = logging.getLogger(__name__)


[docs] def compute_resource_consumption( rcpsp_problem: RcpspProblem, rcpsp_sol: RcpspSolution, list_resources: Optional[list[str]] = None, future_view: bool = True, ) -> tuple[npt.NDArray[np.int_], npt.NDArray[np.int_]]: modes_extended = deepcopy(rcpsp_sol.rcpsp_modes) modes_extended.insert(0, 1) modes_extended.append(1) modes_dict = rcpsp_problem.build_mode_dict(rcpsp_sol.rcpsp_modes) last_activity = rcpsp_problem.sink_task makespan = rcpsp_sol.rcpsp_schedule[last_activity]["end_time"] if list_resources is None: list_resources = rcpsp_problem.resources_list consumptions = np.zeros((len(list_resources), makespan + 1), dtype=np.int_) for act_id in rcpsp_sol.rcpsp_schedule: for ir in range(len(list_resources)): use_ir = rcpsp_problem.mode_details[act_id][modes_dict[act_id]].get( list_resources[ir], 0 ) if future_view: consumptions[ ir, rcpsp_sol.rcpsp_schedule[act_id]["start_time"] + 1 : rcpsp_sol.rcpsp_schedule[act_id]["end_time"] + 1, ] += use_ir else: consumptions[ ir, rcpsp_sol.rcpsp_schedule[act_id][ "start_time" ] : rcpsp_sol.rcpsp_schedule[act_id]["end_time"], ] += use_ir return consumptions, np.arange(0, makespan + 1, 1, dtype=np.int_)
[docs] def compute_nice_resource_consumption( rcpsp_problem: RcpspProblem, rcpsp_sol: RcpspSolution, list_resources: Optional[list[str]] = None, ) -> tuple[dict[int, npt.NDArray[np.int_]], dict[int, npt.NDArray[np.int_]]]: if list_resources is None: list_resources = rcpsp_problem.resources_list c_future, times = compute_resource_consumption( rcpsp_problem, rcpsp_sol, list_resources=list_resources, future_view=True ) c_past, times = compute_resource_consumption( rcpsp_problem, rcpsp_sol, list_resources=list_resources, future_view=False ) merged_times: dict[int, list[int]] = {i: [] for i in range(len(list_resources))} merged_cons: dict[int, list[int]] = {i: [] for i in range(len(list_resources))} for r in range(len(list_resources)): for index_t in range(len(times)): merged_times[r] += [times[index_t], times[index_t]] merged_cons[r] += [c_future[r, index_t], c_past[r, index_t]] return ( {k: np.array(v) for k, v in merged_times.items()}, {k: np.array(v) for k, v in merged_cons.items()}, )
[docs] def plot_ressource_view( rcpsp_problem: RcpspProblem, rcpsp_sol: RcpspSolution, list_resource: Optional[list[str]] = None, title_figure: str = "", x_lim: Optional[list[int]] = None, fig: Optional[plt.Figure] = None, ax: Optional[npt.NDArray[np.object_]] = None, ) -> plt.Figure: modes_extended = deepcopy(rcpsp_sol.rcpsp_modes) modes_extended.insert(0, 1) modes_extended.append(1) with_calendar = rcpsp_problem.is_varying_resource() modes_dict = rcpsp_problem.build_mode_dict(rcpsp_sol.rcpsp_modes) if list_resource is None: list_resource = rcpsp_problem.resources_list if ax is None: fig, ax = plt.subplots(nrows=len(list_resource), figsize=(10, 5), sharex=True) if len(list_resource) == 1: ax = [ax] fig.suptitle(title_figure) polygons_ax: dict[int, list[Polygon]] = {i: [] for i in range(len(list_resource))} labels_ax: dict[int, list[Hashable]] = {i: [] for i in range(len(list_resource))} sorted_activities = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: rcpsp_sol.rcpsp_schedule[x]["start_time"], ) for j in sorted_activities: time_start = rcpsp_sol.rcpsp_schedule[j]["start_time"] time_end = rcpsp_sol.rcpsp_schedule[j]["end_time"] for i in range(len(list_resource)): cons = rcpsp_problem.mode_details[j][modes_dict[j]].get(list_resource[i], 0) if cons == 0: continue bound: int = int(rcpsp_problem.get_max_resource_capacity(list_resource[i])) for k in range(0, bound): polygon = Polygon( [ (time_start, k), (time_end, k), (time_end, k + cons), (time_start, k + cons), (time_start, k), ] ) areas = [p.intersection(polygon).area for p in polygons_ax[i]] if len(areas) == 0 or max(areas) == 0: polygons_ax[i].append(polygon) labels_ax[i].append(j) break for i in range(len(list_resource)): patches = [] for polygon in polygons_ax[i]: x, y = polygon.exterior.xy ax[i].plot(x, y, zorder=-1, color="b") patches.append(pp(xy=polygon.exterior.coords)) p = PatchCollection(patches, cmap=get_cmap("Blues"), alpha=0.4) ax[i].add_collection(p) merged_times, merged_cons = compute_nice_resource_consumption( rcpsp_problem, rcpsp_sol, list_resources=list_resource ) for i in range(len(list_resource)): ax[i].plot( merged_times[i], merged_cons[i], color="r", linewidth=2, label="Consumption " + str(list_resource[i]), zorder=1, ) if not with_calendar: ax[i].axhline( y=rcpsp_problem.resources[list_resource[i]], linestyle="--", label="Limit : " + str(list_resource[i]), zorder=0, ) else: ax[i].plot( merged_times[i], [rcpsp_problem.resources[list_resource[i]][m] for m in merged_times[i]], # type: ignore linestyle="--", label="Limit : " + str(list_resource[i]), zorder=0, ) ax[i].legend(fontsize=5) lims = ax[i].get_xlim() if x_lim is None: ax[i].set_xlim([lims[0], 1.0 * lims[1]]) else: ax[i].set_xlim(x_lim) ax[-1].set_xlabel("Timestep") return fig
[docs] def plot_task_gantt( rcpsp_problem: RcpspProblem, rcpsp_sol: RcpspSolution, fig: Optional[plt.Figure] = None, ax: Optional[plt.Axes] = None, x_lim: Optional[list[int]] = None, title: Optional[str] = None, ) -> plt.Figure: if fig is None or ax is None: fig, ax = plt.subplots(1, figsize=(10, 10)) ax.set_title("Gantt Task") if title is None: ax.set_title("Gantt Task") else: ax.set_title(title) tasks = rcpsp_problem.tasks_list nb_task = len(tasks) sorted_task_by_start = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.get_start_time(x) + rcpsp_problem.index_task[x], ) sorted_task_by_end = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.get_end_time(x) + rcpsp_problem.index_task[x], ) max_time = rcpsp_sol.get_end_time(sorted_task_by_end[-1]) min_time = rcpsp_sol.get_start_time(sorted_task_by_start[0]) patches = [] for j in range(nb_task): nb_colors = len(tasks) // 2 colors = get_cmap_with_nb_colors("hsv", nb_colors) box = [ (j - 0.25, rcpsp_sol.rcpsp_schedule[tasks[j]]["start_time"]), (j - 0.25, rcpsp_sol.rcpsp_schedule[tasks[j]]["end_time"]), (j + 0.25, rcpsp_sol.rcpsp_schedule[tasks[j]]["end_time"]), (j + 0.25, rcpsp_sol.rcpsp_schedule[tasks[j]]["start_time"]), (j - 0.25, rcpsp_sol.rcpsp_schedule[tasks[j]]["start_time"]), ] polygon = Polygon([(b[1], b[0]) for b in box]) x, y = polygon.exterior.xy ax.plot(x, y, zorder=-1, color="b") patches.append( pp(xy=polygon.exterior.coords, facecolor=colors((j - 1) % nb_colors)) ) p = PatchCollection( patches, match_original=True, alpha=0.4, ) ax.add_collection(p) if x_lim is None: ax.set_xlim((min_time, max_time)) else: ax.set_xlim(x_lim) ax.set_ylim((-0.5, nb_task)) ax.set_yticks(range(nb_task)) ax.set_yticklabels( tuple([str(tasks[j]) for j in range(nb_task)]), fontdict={"size": 5} ) ax.set_ylabel("Task number") ax.set_xlabel("Timestep") return fig
[docs] def compute_schedule_per_resource_individual( rcpsp_problem: RcpspProblem, rcpsp_sol: RcpspSolution, resource_types_to_consider: Optional[list[str]] = None, ) -> dict[str, dict[str, Any]]: modes = rcpsp_problem.build_mode_dict(rcpsp_sol.rcpsp_modes) if resource_types_to_consider is None: resources = rcpsp_problem.resources_list else: resources = resource_types_to_consider sorted_task_by_start = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.get_start_time(x) + rcpsp_problem.index_task[x], ) sorted_task_by_end = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.get_end_time(x) + rcpsp_problem.index_task[x], ) max_time = rcpsp_sol.get_end_time(sorted_task_by_end[-1]) min_time = rcpsp_sol.get_start_time(sorted_task_by_end[0]) with_calendar = rcpsp_problem.is_varying_resource() array_ressource_usage: dict[str, dict[str, Any]] = { resources[i]: { "activity": np.zeros( ( max_time - min_time + 1, rcpsp_problem.get_max_resource_capacity(resources[i]), ) ), "binary_activity": np.zeros( ( max_time - min_time + 1, rcpsp_problem.get_max_resource_capacity(resources[i]), ) ), "total_activity": np.zeros( rcpsp_problem.get_max_resource_capacity(resources[i]) ), "activity_last_n_hours": np.zeros( ( max_time - min_time + 1, rcpsp_problem.get_max_resource_capacity(resources[i]), ) ), "boxes_time": [], } for i in range(len(resources)) } total_time = max_time - min_time + 1 nhour = int(min(8, total_time / 2 - 1)) index_to_time = {i: min_time + i for i in range(max_time - min_time + 1)} time_to_index = {index_to_time[i]: i for i in index_to_time} for activity in sorted_task_by_start: mode = modes[activity] start_time = rcpsp_sol.rcpsp_schedule[activity]["start_time"] end_time = rcpsp_sol.rcpsp_schedule[activity]["end_time"] if end_time == start_time: continue resources_needed = { r: rcpsp_problem.mode_details[activity][mode].get(r, 0) for r in resources } for r in resources_needed: if r not in array_ressource_usage: continue rneeded = resources_needed[r] if not with_calendar: range_interest = range(array_ressource_usage[r]["activity"].shape[1]) else: range_interest = range( rcpsp_problem.resources[r][time_to_index[start_time]] # type: ignore ) while rneeded > 0: availables_people_r = [ i for i in range_interest if array_ressource_usage[r]["activity"][ time_to_index[start_time], i ] == 0 ] logger.debug(f"{len(availables_people_r)} people available : ") if len(availables_people_r) > 0: resource = min( availables_people_r, key=lambda x: array_ressource_usage[r]["total_activity"][x], ) # greedy choice, # the one who worked the less until now. array_ressource_usage[r]["activity"][ time_to_index[start_time] : time_to_index[end_time], resource ] = activity array_ressource_usage[r]["binary_activity"][ time_to_index[start_time] : time_to_index[end_time], resource ] = 1 array_ressource_usage[r]["total_activity"][resource] += ( end_time - start_time ) array_ressource_usage[r]["activity_last_n_hours"][ :, resource ] = np.convolve( array_ressource_usage[r]["binary_activity"][:, resource], np.array([1] * nhour + [0] + [0] * nhour), mode="same", ) array_ressource_usage[r]["boxes_time"] += [ [ (resource - 0.25, start_time + 0.01, activity), (resource - 0.25, end_time - 0.01, activity), (resource + 0.25, end_time - 0.01, activity), (resource + 0.25, start_time + 0.01, activity), (resource - 0.25, start_time + 0.01, activity), ] ] # for plot purposes. rneeded -= 1 else: logger.debug(f"r_needed {rneeded}") logger.debug(f"Ressource needed : {resources_needed}") logger.debug(f"ressource : {r}") logger.debug(f"activity : {activity}") logger.warning("Problem, can't build schedule") logger.debug(array_ressource_usage[r]["activity"]) rneeded = 0 return array_ressource_usage
[docs] def plot_resource_individual_gantt( rcpsp_problem: RcpspProblem, rcpsp_sol: RcpspSolution, resource_types_to_consider: Optional[list[str]] = None, title_figure: str = "", x_lim: Optional[list[int]] = None, fig: Optional[plt.Figure] = None, ax: Optional[npt.NDArray[np.object_]] = None, current_t: Optional[int] = None, ) -> plt.Figure: array_ressource_usage = compute_schedule_per_resource_individual( rcpsp_problem, rcpsp_sol, resource_types_to_consider=resource_types_to_consider ) sorted_task_by_start = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.get_start_time(x) + rcpsp_problem.index_task[x], ) sorted_task_by_end = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.get_end_time(x) + rcpsp_problem.index_task[x], ) max_time = rcpsp_sol.get_end_time(sorted_task_by_end[-1]) min_time = rcpsp_sol.get_start_time(sorted_task_by_end[0]) resources_list = list(array_ressource_usage.keys()) if fig is None or ax is None: fig, ax_ = plt.subplots(len(array_ressource_usage), figsize=(10, 5)) fig.suptitle(title_figure) if len(array_ressource_usage) == 1: ax = np.array([ax_]) else: ax = ax_ if ax is None: # for mypy raise RuntimeError("ax cannot be None at this point") for i in range(len(resources_list)): patches = [] nb_colors = len(sorted_task_by_start) // 2 colors = get_cmap_with_nb_colors("hsv", nb_colors) for boxe in array_ressource_usage[resources_list[i]]["boxes_time"]: polygon = Polygon([(b[1], b[0]) for b in boxe]) activity = boxe[0][2] x, y = polygon.exterior.xy ax[i].plot(x, y, zorder=-1, color="b") patches.append( pp( xy=polygon.exterior.coords, facecolor=colors((activity - 1) % nb_colors), ) ) p = PatchCollection( patches, match_original=True, alpha=0.4, ) ax[i].add_collection(p) ax[i].set_title(resources_list[i]) if x_lim is None: ax[i].set_xlim((min_time, max_time)) else: ax[i].set_xlim(x_lim) try: ax[i].set_ylim((-0.5, rcpsp_problem.resources[resources_list[i]])) ax[i].set_yticks(range(rcpsp_problem.resources[resources_list[i]])) # type: ignore ax[i].set_yticklabels( tuple([j for j in range(rcpsp_problem.resources[resources_list[i]])]), # type: ignore fontdict={"size": 7}, ) except: m = rcpsp_problem.get_max_resource_capacity(resources_list[i]) ax[i].set_ylim((-0.5, m)) ax[i].set_yticks(range(m)) ax[i].set_yticklabels(tuple([j for j in range(m)]), fontdict={"size": 7}) ax[i].grid(True) if current_t is not None: ax[i].axvline(x=current_t, label="pyplot vertical line", color="r", ls="--") ax[-1].set_xlabel("Timestep") return fig
[docs] def kendall_tau_similarity(rcpsp_sols: tuple[RcpspSolution, RcpspSolution]) -> float: sol1 = rcpsp_sols[0] sol2 = rcpsp_sols[1] perm1 = sol1.generate_permutation_from_schedule() perm2 = sol2.generate_permutation_from_schedule() ktd, p_value = scipy.stats.kendalltau(perm1, perm2) return ktd
[docs] def intersect(i1: Sequence[int], i2: Sequence[int]) -> Optional[list[int]]: if i2[0] >= i1[1] or i1[0] >= i2[1]: return None else: s = max(i1[0], i2[0]) e = min(i1[1], i2[1]) return [s, e]
[docs] def all_diff_start_time( rcpsp_sols: tuple[RcpspSolution, RcpspSolution] ) -> dict[Hashable, int]: sol1 = rcpsp_sols[0] sol2 = rcpsp_sols[1] return { act_id: ( sol1.rcpsp_schedule[act_id]["start_time"] - sol2.rcpsp_schedule[act_id]["start_time"] ) for act_id in sol1.rcpsp_schedule }
[docs] def compute_graph_rcpsp(rcpsp_problem: RcpspProblem) -> Graph: nodes = [ ( n, { str(mode): rcpsp_problem.mode_details[n][mode]["duration"] for mode in rcpsp_problem.mode_details[n] }, ) for n in rcpsp_problem.tasks_list ] edges = [] for n in rcpsp_problem.successors: for succ in rcpsp_problem.successors[n]: dict_transition: dict[str, int] = { str(mode): rcpsp_problem.mode_details[n][mode]["duration"] for mode in rcpsp_problem.mode_details[n] } min_duration = min(dict_transition.values()) max_duration = max(dict_transition.values()) dict_transition["min_duration"] = min_duration dict_transition["max_duration"] = max_duration dict_transition["minus_min_duration"] = -min_duration dict_transition["minus_max_duration"] = -max_duration dict_transition["link"] = 1 edges += [(n, succ, dict_transition)] return Graph(nodes, edges, False)
[docs] def create_fake_tasks(rcpsp_problem: RcpspProblem) -> list[dict[str, int]]: if not rcpsp_problem.is_varying_resource(): return [] else: ressources_arrays = { r: np.array(rcpsp_problem.resources[r]) for r in rcpsp_problem.resources_list } max_capacity = {r: np.max(ressources_arrays[r]) for r in ressources_arrays} fake_tasks: list[dict[str, int]] = [] for r in ressources_arrays: delta = ressources_arrays[r][:-1] - ressources_arrays[r][1:] index_non_zero = np.nonzero(delta)[0] if ressources_arrays[r][0] < max_capacity[r]: consume = { r: int(max_capacity[r] - ressources_arrays[r][0]), "duration": int(index_non_zero[0] + 1), "start": 0, } fake_tasks += [consume] for j in range(len(index_non_zero) - 1): ind = index_non_zero[j] value = ressources_arrays[r][ind + 1] if value != max_capacity[r]: consume = { r: int(max_capacity[r] - value), "duration": int(index_non_zero[j + 1] - ind), "start": int(ind + 1), } fake_tasks += [consume] return fake_tasks
[docs] def get_max_time_solution( solution: Union[PreemptiveRcpspSolution, RcpspSolution] ) -> int: if isinstance(solution, PreemptiveRcpspSolution): max_time = max( [solution.rcpsp_schedule[x]["ends"][-1] for x in solution.rcpsp_schedule] ) return max_time else: max_time = max( [solution.rcpsp_schedule[x]["end_time"] for x in solution.rcpsp_schedule] ) return max_time
[docs] def get_tasks_ending_between_two_times( solution: Union[PreemptiveRcpspSolution, RcpspSolution], time_1: int, time_2: int ) -> list[Hashable]: if isinstance(solution, PreemptiveRcpspSolution): return [ x for x in solution.rcpsp_schedule if time_1 <= solution.rcpsp_schedule[x]["ends"][-1] <= time_2 ] else: return [ x for x in solution.rcpsp_schedule if time_1 <= solution.rcpsp_schedule[x]["end_time"] <= time_2 ]
[docs] def get_start_bounds_from_additional_constraint( rcpsp_problem: RcpspProblem, activity: Hashable ) -> tuple[int, int]: assert activity in rcpsp_problem.index_task lb = 0 ub = rcpsp_problem.horizon if rcpsp_problem.includes_special_constraint(): if ( rcpsp_problem.special_constraints.start_times is not None and activity in rcpsp_problem.special_constraints.start_times and rcpsp_problem.special_constraints.start_times[activity] is not None ): lb = ub = rcpsp_problem.special_constraints.start_times[activity] else: if activity in rcpsp_problem.special_constraints.start_times_window: lbs, ubs = rcpsp_problem.special_constraints.start_times_window[ activity ] if lbs is not None: lb = lbs if ubs is not None: ub = ubs if activity in rcpsp_problem.special_constraints.end_times_window: lbs, ubs = rcpsp_problem.special_constraints.end_times_window[activity] if lbs is not None: max_duration = max( [ rcpsp_problem.mode_details[activity][m]["duration"] for m in rcpsp_problem.mode_details[activity] ] ) lb = max(lb, lbs - max_duration) if ubs is not None: min_duration = min( [ rcpsp_problem.mode_details[activity][m]["duration"] for m in rcpsp_problem.mode_details[activity] ] ) ub = min(ub, ubs - min_duration) if ub < 0: print(ub) return int(lb), int(ub)
[docs] def get_end_bounds_from_additional_constraint( rcpsp_problem: RcpspProblem, activity: Hashable ) -> tuple[int, int]: assert activity in rcpsp_problem.index_task lb = 0 ub = rcpsp_problem.horizon if rcpsp_problem.includes_special_constraint(): if ( rcpsp_problem.special_constraints.end_times is not None and activity in rcpsp_problem.special_constraints.end_times and rcpsp_problem.special_constraints.end_times[activity] is not None ): lb = ub = rcpsp_problem.special_constraints.end_times[activity] else: if activity in rcpsp_problem.special_constraints.end_times_window: lbs, ubs = rcpsp_problem.special_constraints.end_times_window[activity] if lbs is not None: lb = lbs if ubs is not None: ub = ubs if activity in rcpsp_problem.special_constraints.start_times_window: lbs, ubs = rcpsp_problem.special_constraints.start_times_window[ activity ] if lbs is not None: min_duration = min( [ rcpsp_problem.mode_details[activity][m]["duration"] for m in rcpsp_problem.mode_details[activity] ] ) lb = max(lb, lbs + min_duration) if ubs is not None: max_duration = max( [ rcpsp_problem.mode_details[activity][m]["duration"] for m in rcpsp_problem.mode_details[activity] ] ) ub = min(ub, ubs + max_duration) return int(lb), int(ub)