Source code for discrete_optimization.rcpsp.utils_preemptive

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
from typing import Union

import matplotlib.pyplot as plt
import numpy as np
from matplotlib.collections import PatchCollection
from matplotlib.font_manager import FontProperties
from matplotlib.patches import Polygon as pp
from shapely.geometry import Polygon

from discrete_optimization.generic_tools.plot_utils import (
    get_cmap,
    get_cmap_with_nb_colors,
)
from discrete_optimization.rcpsp.problem_preemptive import (
    PreemptiveRcpspProblem,
    PreemptiveRcpspSolution,
)

logger = logging.getLogger(__name__)


[docs] def compute_resource_consumption( rcpsp_problem: PreemptiveRcpspProblem, rcpsp_sol: PreemptiveRcpspSolution, list_resources: list[Union[int, str]] = None, future_view=True, ): modes_dict = rcpsp_problem.get_modes_dict(rcpsp_sol) modes_dict[rcpsp_problem.source_task] = 1 modes_dict[rcpsp_problem.sink_task] = 1 last_activity = rcpsp_problem.sink_task makespan = rcpsp_sol.get_end_time(last_activity) if list_resources is None: list_resources = rcpsp_problem.resources_list consumptions = np.zeros((len(list_resources), makespan + 1)) for act_id in rcpsp_problem.get_tasks_list(): for ir in range(len(list_resources)): use_ir = rcpsp_problem.mode_details[act_id][modes_dict[act_id]].get( list_resources[ir], 0 ) for s, e in zip( rcpsp_sol.get_start_times_list(act_id), rcpsp_sol.get_end_times_list(act_id), ): if future_view: consumptions[ir, s + 1 : e + 1] += use_ir else: consumptions[ir, s:e] += use_ir return consumptions, np.arange(0, makespan + 1, 1)
[docs] def compute_nice_resource_consumption( rcpsp_problem: PreemptiveRcpspProblem, rcpsp_sol: PreemptiveRcpspSolution, list_resources: list[Union[int, str]] = None, ): if list_resources is None: list_resources = rcpsp_problem.resources_list c_future, times = compute_resource_consumption( rcpsp_problem, rcpsp_sol, list_resources=list_resources, future_view=True ) c_past, times = compute_resource_consumption( rcpsp_problem, rcpsp_sol, list_resources=list_resources, future_view=False ) merged_times = {i: [] for i in range(len(list_resources))} merged_cons = {i: [] for i in range(len(list_resources))} for r in range(len(list_resources)): for index_t in range(len(times)): merged_times[r] += [times[index_t], times[index_t]] merged_cons[r] += [c_future[r, index_t], c_past[r, index_t]] for r in merged_times: merged_times[r] = np.array(merged_times[r]) merged_cons[r] = np.array(merged_cons[r]) return merged_times, merged_cons
[docs] def plot_ressource_view( rcpsp_problem: PreemptiveRcpspProblem, rcpsp_sol: PreemptiveRcpspSolution, list_resource: list[Union[int, str]] = None, title_figure="", x_lim=None, fig=None, ax=None, ): modes_dict = rcpsp_problem.get_modes_dict(rcpsp_sol) with_calendar = rcpsp_problem.is_varying_resource() if list_resource is None: list_resource = rcpsp_problem.resources_list if ax is None: fig, ax = plt.subplots(nrows=len(list_resource), figsize=(10, 5), sharex=True) if len(list_resource) == 1: ax = [ax] fig.suptitle(title_figure) polygons_ax = {i: [] for i in range(len(list_resource))} labels_ax = {i: [] for i in range(len(list_resource))} sorted_activities = sorted( rcpsp_problem.get_tasks_list(), key=lambda x: rcpsp_sol.get_start_time(x) ) for j in sorted_activities: time_starts = rcpsp_sol.get_start_times_list(j) # rcpsp_schedule[j]["starts"] time_ends = rcpsp_sol.get_end_times_list(j) # rcpsp_schedule[j]["ends"] index = 0 for time_start, time_end in zip(time_starts, time_ends): for i in range(len(list_resource)): cons = rcpsp_problem.mode_details[j][modes_dict[j]].get( list_resource[i], 0 ) if cons == 0: continue bound = ( rcpsp_problem.get_resource_available(list_resource[i], 0) if not with_calendar else max( rcpsp_problem.get_resource_availability_array(list_resource[i]) ) ) for k in range(0, bound + 5): polygon = Polygon( [ (time_start, k), (time_end, k), (time_end, k + cons), (time_start, k + cons), (time_start, k), ] ) areas = [p.intersection(polygon).area for p in polygons_ax[i]] if len(areas) == 0 or max(areas) == 0: polygons_ax[i].append(polygon) labels_ax[i].append(j) ax[i].annotate( j, xy=((time_start + time_end) / 2, (k + cons / 2)), font_properties=FontProperties(size=6, weight="bold"), verticalalignment="center", horizontalalignment="left", color="k", clip_on=True, ) break index += 1 for i in range(len(list_resource)): patches = [] for polygon in polygons_ax[i]: x, y = polygon.exterior.xy ax[i].plot(x, y, zorder=-1, color="b") patches.append(pp(xy=polygon.exterior.coords)) p = PatchCollection(patches, cmap=get_cmap("Blues"), alpha=0.4) ax[i].add_collection(p) merged_times, merged_cons = compute_nice_resource_consumption( rcpsp_problem, rcpsp_sol, list_resources=list_resource ) for i in range(len(list_resource)): ax[i].plot( merged_times[i], merged_cons[i], color="r", linewidth=2, label="Consumption " + str(list_resource[i]), zorder=1, ) if not with_calendar: ax[i].axhline( y=rcpsp_problem.get_resource_available(list_resource[i], 0), linestyle="--", label="Limit : " + str(list_resource[i]), zorder=0, ) else: ax[i].plot( merged_times[i], [ rcpsp_problem.get_resource_available(list_resource[i], m) for m in merged_times[i] ], linestyle="--", label="Limit : " + str(list_resource[i]), zorder=0, ) ax[i].legend(fontsize=5) lims = ax[i].get_xlim() if x_lim is None: ax[i].set_xlim([lims[0], 1.0 * lims[1]]) else: ax[i].set_xlim(x_lim) ax[-1].set_xlabel("Timestep") return fig
[docs] def plot_task_gantt( rcpsp_problem: PreemptiveRcpspProblem, rcpsp_sol: PreemptiveRcpspSolution, fig=None, ax=None, x_lim=None, title=None, current_t=None, ): if fig is None or ax is None: fig, ax = plt.subplots(1, figsize=(7, 7)) ax.set_title("Gantt Task") if title is None: ax.set_title("Gantt Task") else: ax.set_title(title) tasks = rcpsp_problem.tasks_list nb_task = len(tasks) sorted_task_by_start = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["starts"][0] + rcpsp_problem.index_task[x], ) sorted_task_by_end = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["ends"][-1] + rcpsp_problem.index_task[x], ) max_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_end[-1]]["ends"][-1] min_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_start[0]]["starts"][0] patches = [] for j in range(nb_task): nb_colors = len(tasks) // 2 colors = get_cmap_with_nb_colors("hsv", nb_colors) for start, end in zip( rcpsp_sol.rcpsp_schedule[tasks[j]]["starts"], rcpsp_sol.rcpsp_schedule[tasks[j]]["ends"], ): box = [ (j - 0.25, start), (j - 0.25, end), (j + 0.25, end), (j + 0.25, start), (j - 0.25, start), ] polygon = Polygon([(b[1], b[0]) for b in box]) x, y = polygon.exterior.xy ax.plot(x, y, zorder=-1, color="b") patches.append( pp(xy=polygon.exterior.coords, facecolor=colors((j - 1) % nb_colors)) ) ax.annotate( tasks[j], xy=( ( rcpsp_sol.rcpsp_schedule[tasks[j]]["starts"][0] + rcpsp_sol.rcpsp_schedule[tasks[j]]["ends"][0] ) / 2, j, ), font_properties=FontProperties(size=7, weight="bold"), verticalalignment="center", horizontalalignment="left", color="k", clip_on=True, ) p = PatchCollection(patches, match_original=True, alpha=0.4) ax.add_collection(p) if x_lim is None: ax.set_xlim((min_time, max_time)) else: ax.set_xlim(x_lim) ax.set_ylim((-0.5, nb_task)) ax.set_yticks(range(nb_task)) ax.set_yticklabels( tuple([str(tasks[j]) for j in range(nb_task)]), fontdict={"size": 5} ) ax.set_ylabel("Task number") ax.set_xlabel("Timestep") return fig
[docs] def compute_schedule_per_resource_individual( rcpsp_problem: PreemptiveRcpspProblem, rcpsp_sol: PreemptiveRcpspSolution, resource_types_to_consider: list[str] = None, ): modes_dict = rcpsp_problem.build_mode_dict( rcpsp_modes_from_solution=rcpsp_sol.rcpsp_modes ) if resource_types_to_consider is None: resources = rcpsp_problem.resources_list else: resources = resource_types_to_consider sorted_task_by_start = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["starts"][0] + rcpsp_problem.index_task[x], ) sorted_task_by_end = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["ends"][-1] + rcpsp_problem.index_task[x], ) max_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_end[-1]]["ends"][-1] min_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_start[0]]["starts"][0] with_calendar = rcpsp_problem.is_varying_resource() array_ressource_usage = { resources[i]: { "activity": np.zeros( ( max_time - min_time + 1, max(rcpsp_problem.resources[resources[i]]) if with_calendar else rcpsp_problem.resources[resources[i]], ) ), "binary_activity": np.zeros( ( max_time - min_time + 1, max(rcpsp_problem.resources[resources[i]]) if with_calendar else rcpsp_problem.resources[resources[i]], ) ), "total_activity": np.zeros( max(rcpsp_problem.resources[resources[i]]) if with_calendar else rcpsp_problem.resources[resources[i]] ), "activity_last_n_hours": np.zeros( ( max_time - min_time + 1, max(rcpsp_problem.resources[resources[i]]) if with_calendar else rcpsp_problem.resources[resources[i]], ) ), "boxes_time": [], } for i in range(len(resources)) } total_time = max_time - min_time + 1 nhour = int(min(8, total_time / 2 - 1)) index_to_time = {i: min_time + i for i in range(max_time - min_time + 1)} time_to_index = {index_to_time[i]: i for i in index_to_time} for activity in sorted_task_by_start: mode = modes_dict[activity] start_times = rcpsp_sol.rcpsp_schedule[activity]["starts"] end_times = rcpsp_sol.rcpsp_schedule[activity]["ends"] for start_time, end_time in zip(start_times, end_times): if end_time == start_time: continue resources_needed = { r: rcpsp_problem.mode_details[activity][mode].get(r, 0) for r in resources } for r in resources_needed: if r not in array_ressource_usage: continue rneeded = resources_needed[r] if not with_calendar: range_interest = range( array_ressource_usage[r]["activity"].shape[1] ) else: range_interest = range( rcpsp_problem.resources[r][time_to_index[start_time]] ) while rneeded > 0: availables_people_r = [ i for i in range_interest if array_ressource_usage[r]["activity"][ time_to_index[start_time], i ] == 0 ] logger.debug(f"{len(availables_people_r)} people available : ") if len(availables_people_r) > 0: resource = min( availables_people_r, key=lambda x: array_ressource_usage[r]["total_activity"][x], ) # greedy choice, # the one who worked the less until now. array_ressource_usage[r]["activity"][ time_to_index[start_time] : time_to_index[end_time], resource, ] = ( rcpsp_problem.index_task[activity] if isinstance(activity, str) else activity ) array_ressource_usage[r]["binary_activity"][ time_to_index[start_time] : time_to_index[end_time], resource, ] = 1 array_ressource_usage[r]["total_activity"][resource] += ( end_time - start_time ) array_ressource_usage[r]["activity_last_n_hours"][ :, resource ] = np.convolve( array_ressource_usage[r]["binary_activity"][:, resource], np.array([1] * nhour + [0] + [0] * nhour), mode="same", ) array_ressource_usage[r]["boxes_time"] += [ [ ( resource - 0.25, start_time + 0.01, rcpsp_problem.index_task[activity], ), ( resource - 0.25, end_time - 0.01, rcpsp_problem.index_task[activity], ), ( resource + 0.25, end_time - 0.01, rcpsp_problem.index_task[activity], ), ( resource + 0.25, start_time + 0.01, rcpsp_problem.index_task[activity], ), ( resource - 0.25, start_time + 0.01, rcpsp_problem.index_task[activity], ), ] ] # for plot purposes. rneeded -= 1 else: logger.debug(f"r_needed {rneeded}") logger.debug(f"Ressource needed : {resources_needed}") logger.debug(f"ressource : {r}") logger.debug(f"activity : {activity}") logger.warning("Problem, can't build schedule") logger.debug(array_ressource_usage[r]["activity"]) rneeded = 0 return array_ressource_usage
[docs] def plot_resource_individual_gantt( rcpsp_problem: PreemptiveRcpspProblem, rcpsp_sol: PreemptiveRcpspSolution, resource_types_to_consider: list[str] = None, title_figure="", x_lim=None, fig=None, ax=None, current_t=None, ): array_ressource_usage = compute_schedule_per_resource_individual( rcpsp_problem, rcpsp_sol, resource_types_to_consider=resource_types_to_consider ) sorted_task_by_start = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["starts"][0] + rcpsp_problem.index_task[x], ) sorted_task_by_end = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["ends"][-1] + rcpsp_problem.index_task[x], ) max_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_end[-1]]["ends"][-1] min_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_start[0]]["starts"][0] resources_list = list(array_ressource_usage.keys()) if fig is None or ax is None: fig, ax = plt.subplots(len(array_ressource_usage), figsize=(10, 5)) fig.suptitle(title_figure) if len(array_ressource_usage) == 1: ax = [ax] for i in range(len(resources_list)): patches = [] nb_colors = len(sorted_task_by_start) // 2 colors = get_cmap_with_nb_colors("hsv", nb_colors) for boxe in array_ressource_usage[resources_list[i]]["boxes_time"]: polygon = Polygon([(b[1], b[0]) for b in boxe]) activity = boxe[0][2] x, y = polygon.exterior.xy ax[i].plot(x, y, zorder=-1, color="b") patches.append( pp( xy=polygon.exterior.coords, facecolor=colors((activity - 1) % nb_colors), ) ) p = PatchCollection( patches, match_original=True, alpha=0.4, ) ax[i].add_collection(p) ax[i].set_title(resources_list[i]) if x_lim is None: ax[i].set_xlim((min_time, max_time)) else: ax[i].set_xlim(x_lim) try: ax[i].set_ylim((-0.5, rcpsp_problem.resources[resources_list[i]])) ax[i].set_yticks(range(rcpsp_problem.resources[resources_list[i]])) ax[i].set_yticklabels( tuple([j for j in range(rcpsp_problem.resources[resources_list[i]])]), fontdict={"size": 7}, ) except: m = max(rcpsp_problem.resources[resources_list[i]]) ax[i].set_ylim((-0.5, m)) ax[i].set_yticks(range(m)) ax[i].set_yticklabels(tuple([j for j in range(m)]), fontdict={"size": 7}) ax[i].grid(True) if current_t is not None: ax[i].axvline(x=current_t, label="pyplot vertical line", color="r", ls="--") ax[-1].set_xlabel("Timestep") return fig