Source code for discrete_optimization.rcpsp_multiskill.plots.plot_solution

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import matplotlib.pyplot as plt
import numpy as np
from matplotlib.collections import PatchCollection
from matplotlib.font_manager import FontProperties
from matplotlib.patches import Polygon as pp
from shapely.geometry import Polygon

from discrete_optimization.generic_tools.plot_utils import get_cmap_with_nb_colors
from discrete_optimization.rcpsp_multiskill.problem import (

[docs] def compute_schedule_per_resource_individual_preemptive( rcpsp_problem: MultiskillRcpspProblem, rcpsp_sol: PreemptiveMultiskillRcpspSolution ): sorted_task_by_start = sorted( rcpsp_sol.schedule, key=lambda x: 100000 * rcpsp_sol.get_start_time(x) + rcpsp_problem.index_task[x], ) sorted_task_by_end = sorted( rcpsp_sol.schedule, key=lambda x: 100000 * rcpsp_sol.get_end_time(x) + rcpsp_problem.index_task[x], ) max_time = rcpsp_sol.get_end_time(sorted_task_by_end[-1]) min_time = rcpsp_sol.get_start_time(sorted_task_by_end[0]) employee_usage = { employee: { "activity": np.zeros((max_time - min_time + 1)), "binary_activity": np.zeros((max_time - min_time + 1)), "total_activity": 0, "boxes_time": [], } for employee in rcpsp_problem.employees } index_to_time = {i: min_time + i for i in range(max_time - min_time + 1)} time_to_index = {index_to_time[i]: i for i in index_to_time} sorted_employees = list(sorted(rcpsp_problem.employees)) for activity in sorted_task_by_start: for i in range(len(rcpsp_sol.employee_usage.get(activity, []))): if isinstance(rcpsp_sol.employee_usage[activity], dict): employees_i = rcpsp_sol.employee_usage[activity].keys() else: employees_i = rcpsp_sol.employee_usage[activity][i] for employee in employees_i: start_time = rcpsp_sol.schedule[activity]["starts"][i] end_time = rcpsp_sol.schedule[activity]["ends"][i] employee_usage[employee]["activity"][ time_to_index[start_time] : time_to_index[end_time] ] = ( rcpsp_problem.index_task[activity] if isinstance(activity, str) else activity ) employee_usage[employee]["binary_activity"][ time_to_index[start_time] : time_to_index[end_time] ] = 1 employee_usage[employee]["total_activity"] += end_time - start_time index_employee = sorted_employees.index(employee) employee_usage[employee]["boxes_time"] += [ [ (index_employee - 0.25, start_time + 0.01, activity), (index_employee - 0.25, end_time - 0.01, activity), (index_employee + 0.25, end_time - 0.01, activity), (index_employee + 0.25, start_time + 0.01, activity), (index_employee - 0.25, start_time + 0.01, activity), ] ] return employee_usage
[docs] def compute_schedule_per_resource_individual( rcpsp_problem: MultiskillRcpspProblem, rcpsp_sol: MultiskillRcpspSolution ): modes = rcpsp_sol.modes sorted_task_by_start = sorted( rcpsp_sol.schedule, key=lambda x: 100000 * rcpsp_sol.get_start_time(x) + rcpsp_problem.index_task[x], ) sorted_task_by_end = sorted( rcpsp_sol.schedule, key=lambda x: 100000 * rcpsp_sol.get_end_time(x) + rcpsp_problem.index_task[x], ) max_time = rcpsp_sol.get_end_time(sorted_task_by_end[-1]) min_time = rcpsp_sol.get_start_time(sorted_task_by_end[0]) employee_usage = { employee: { "activity": np.zeros((max_time - min_time + 1)), "binary_activity": np.zeros((max_time - min_time + 1)), "total_activity": 0, "boxes_time": [], } for employee in rcpsp_problem.employees } index_to_time = {i: min_time + i for i in range(max_time - min_time + 1)} time_to_index = {index_to_time[i]: i for i in index_to_time} sorted_employees = list(sorted(rcpsp_problem.employees)) for activity in sorted_task_by_start: start_time = rcpsp_sol.get_start_time(activity) end_time = rcpsp_sol.get_end_time(activity) for employee in rcpsp_sol.employee_usage.get(activity, {}): employee_usage[employee]["activity"][ time_to_index[start_time] : time_to_index[end_time] ] = ( rcpsp_problem.index_task[activity] if isinstance(activity, str) else activity ) employee_usage[employee]["binary_activity"][ time_to_index[start_time] : time_to_index[end_time] ] = 1 employee_usage[employee]["total_activity"] += end_time - start_time index_employee = sorted_employees.index(employee) employee_usage[employee]["boxes_time"] += [ [ (index_employee - 0.25, start_time + 0.01, activity), (index_employee - 0.25, end_time - 0.01, activity), (index_employee + 0.25, end_time - 0.01, activity), (index_employee + 0.25, start_time + 0.01, activity), (index_employee - 0.25, start_time + 0.01, activity), ] ] return employee_usage
[docs] def plot_resource_individual_gantt( rcpsp_problem: MultiskillRcpspProblem, rcpsp_sol: MultiskillRcpspSolution, title_figure="", name_task=None, fig=None, ax=None, current_t=None, ): array_ressource_usage = compute_schedule_per_resource_individual( rcpsp_problem, rcpsp_sol ) sorted_task_by_start = sorted( rcpsp_sol.schedule, key=lambda x: 100000 * rcpsp_sol.get_start_time(x) + rcpsp_problem.index_task[x], ) sorted_task_by_end = sorted( rcpsp_sol.schedule, key=lambda x: 100000 * rcpsp_sol.get_end_time(x) + rcpsp_problem.index_task[x], ) max_time = rcpsp_sol.get_end_time(sorted_task_by_end[-1]) min_time = rcpsp_sol.get_start_time(sorted_task_by_end[0]) sorted_employees = list(sorted(rcpsp_problem.employees)) if name_task is None: name_task = {} for t in rcpsp_problem.mode_details: name_task[t] = str(t) if fig is None or ax is None: fig, ax = plt.subplots(1, figsize=(12, 6)) fig.suptitle(title_figure) position_label = {} for i in range(len(sorted_employees)): patches = [] nb_colors = len(sorted_task_by_start) // 2 colors = get_cmap_with_nb_colors("hsv", nb_colors) for boxe in array_ressource_usage[sorted_employees[i]]["boxes_time"]: polygon = Polygon([(b[1], b[0]) for b in boxe]) activity = boxe[0][2] x, y = polygon.exterior.xy ax.plot(x, y, zorder=-1, color="b") patches.append( pp( xy=polygon.exterior.coords, facecolor=colors((rcpsp_problem.index_task[activity]) % nb_colors), ) ) activity = boxe[0][2] if abs(boxe[0][1] - boxe[1][1]) >= 0.4: # (resource - 0.25, start_time + 0.01, activity), # (resource - 0.25, end_time - 0.01, activity), # (resource + 0.25, end_time - 0.01, activity), # (resource + 0.25, start_time + 0.01, activity), # (resource - 0.25, start_time + 0.01, activity) center = ( sum([b[1] for b in boxe[:4]]) / 4 - 0.4, sum(b[0] for b in boxe[:4]) / 4, ) if activity not in position_label: position_label[activity] = center position_label[activity] = max(center, position_label[activity]) p = PatchCollection( patches, match_original=True, alpha=0.4, ) ax.add_collection(p) ax.set_xlim((min_time, max_time)) ax.set_ylim((-0.5, len(sorted_employees))) ax.set_yticks(range(len(sorted_employees))) ax.set_yticklabels(tuple(sorted_employees), fontdict={"size": 7}) for activity in position_label: ax.annotate( name_task[activity], xy=position_label[activity], font_properties=FontProperties(size=7, weight="bold"), verticalalignment="center", horizontalalignment="left", color="k", clip_on=True, ) ax.grid(True) if current_t is not None: ax.axvline(x=current_t, label="pyplot vertical line", color="r", ls="--") return fig
[docs] def plot_resource_individual_gantt_preemptive( rcpsp_problem: MultiskillRcpspProblem, rcpsp_sol: PreemptiveMultiskillRcpspSolution, title_figure="", name_task=None, subtasks=None, fig=None, ax=None, current_t=None, ): array_ressource_usage = compute_schedule_per_resource_individual_preemptive( rcpsp_problem, rcpsp_sol ) sorted_task_by_start = sorted( rcpsp_sol.schedule, key=lambda x: 100000 * rcpsp_sol.get_start_time(x) + rcpsp_problem.index_task[x], ) sorted_task_by_end = sorted( rcpsp_sol.schedule, key=lambda x: 100000 * rcpsp_sol.get_end_time(x) + rcpsp_problem.index_task[x], ) max_time = rcpsp_sol.get_end_time(sorted_task_by_end[-1]) min_time = rcpsp_sol.get_start_time(sorted_task_by_end[0]) sorted_employees = list(sorted(rcpsp_problem.employees)) if name_task is None: name_task = {} for t in rcpsp_problem.mode_details: name_task[t] = str(t) if subtasks is None: subtasks = set(rcpsp_problem.tasks_list) if fig is None or ax is None: fig, ax = plt.subplots(1, figsize=(12, 6)) fig.suptitle(title_figure) position_label = {} for i in range(len(sorted_employees)): patches = [] nb_colors = len(sorted_task_by_start) // 2 colors = get_cmap_with_nb_colors("hsv", nb_colors) for boxe in array_ressource_usage[sorted_employees[i]]["boxes_time"]: polygon = Polygon([(b[1], b[0]) for b in boxe]) activity = boxe[0][2] if activity not in subtasks: continue x, y = polygon.exterior.xy ax.plot(x, y, zorder=-1, color="b") patches.append( pp( xy=polygon.exterior.coords, facecolor=colors((rcpsp_problem.index_task[activity]) % nb_colors), ) ) activity = boxe[0][2] if abs(boxe[0][1] - boxe[1][1]) >= 0.4: center = ( sum([b[1] for b in boxe[:4]]) / 4 - 0.4, sum(b[0] for b in boxe[:4]) / 4, ) if activity not in position_label: position_label[activity] = center position_label[activity] = max(center, position_label[activity]) p = PatchCollection( patches, match_original=True, alpha=0.4, ) ax.add_collection(p) ax.set_xlim((min_time, max_time)) ax.set_ylim((-0.5, len(sorted_employees))) ax.set_yticks(range(len(sorted_employees))) ax.set_yticklabels(tuple(sorted_employees), fontdict={"size": 7}) for activity in position_label: ax.annotate( name_task[activity], xy=position_label[activity], font_properties=FontProperties(size=7, weight="bold"), verticalalignment="center", horizontalalignment="left", color="k", clip_on=True, ) ax.grid(True) if current_t is not None: ax.axvline(x=current_t, label="pyplot vertical line", color="r", ls="--") return fig
[docs] def plot_task_gantt( rcpsp_problem: MultiskillRcpspProblem, rcpsp_sol: MultiskillRcpspSolution, subtasks=None, fig=None, ax=None, x_lim=None, title=None, current_t=None, ): if fig is None or ax is None: fig, ax = plt.subplots(1, figsize=(7, 7)) ax.set_title("Gantt Task") if title is None: ax.set_title("Gantt Task") else: ax.set_title(title) if subtasks is None: subtasks = set(rcpsp_problem.tasks_list) tasks = [t for t in rcpsp_problem.tasks_list if t in subtasks] nb_task = len(tasks) sorted_task_by_start = sorted( tasks, key=lambda x: 100000 * rcpsp_sol.get_start_time(x) + rcpsp_problem.index_task[x], ) sorted_task_by_end = sorted( tasks, key=lambda x: 100000 * rcpsp_sol.get_end_time(x) + rcpsp_problem.index_task[x], ) max_time = rcpsp_sol.get_end_time(sorted_task_by_end[-1]) min_time = rcpsp_sol.get_start_time(sorted_task_by_start[0]) patches = [] for j in range(nb_task): nb_colors = len(tasks) // 2 colors = get_cmap_with_nb_colors("hsv", nb_colors) for start, end in zip( rcpsp_sol.get_start_times_list(tasks[j]), rcpsp_sol.get_end_times_list(tasks[j]), ): box = [ (j - 0.25, start), (j - 0.25, end), (j + 0.25, end), (j + 0.25, start), (j - 0.25, start), ] polygon = Polygon([(b[1], b[0]) for b in box]) x, y = polygon.exterior.xy ax.plot(x, y, zorder=-1, color="b") patches.append( pp(xy=polygon.exterior.coords, facecolor=colors((j - 1) % nb_colors)) ) ax.annotate( tasks[j], xy=( ( rcpsp_sol.get_start_times_list(tasks[j])[0] + +rcpsp_sol.get_end_times_list(tasks[j])[0] ) / 2, j, ), font_properties=FontProperties(size=7, weight="bold"), verticalalignment="center", horizontalalignment="left", color="k", clip_on=True, ) p = PatchCollection(patches, match_original=True, alpha=0.4) ax.add_collection(p) if x_lim is None: ax.set_xlim((min_time, max_time)) else: ax.set_xlim(x_lim) ax.set_ylim((-0.5, nb_task)) ax.set_yticks(range(nb_task)) ax.set_yticklabels( tuple([str(tasks[j]) for j in range(nb_task)]), fontdict={"size": 5} ) ax.set_ylabel("Task number") ax.set_xlabel("Timestep") return fig