# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
from typing import Union
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.collections import PatchCollection
from matplotlib.font_manager import FontProperties
from matplotlib.patches import Polygon as pp
from shapely.geometry import Polygon
from discrete_optimization.generic_tools.plot_utils import (
get_cmap,
get_cmap_with_nb_colors,
)
from discrete_optimization.rcpsp.problem_preemptive import (
PreemptiveRcpspProblem,
PreemptiveRcpspSolution,
)
logger = logging.getLogger(__name__)
[docs]
def compute_resource_consumption(
rcpsp_problem: PreemptiveRcpspProblem,
rcpsp_sol: PreemptiveRcpspSolution,
list_resources: list[Union[int, str]] = None,
future_view=True,
):
modes_dict = rcpsp_problem.get_modes_dict(rcpsp_sol)
modes_dict[rcpsp_problem.source_task] = 1
modes_dict[rcpsp_problem.sink_task] = 1
last_activity = rcpsp_problem.sink_task
makespan = rcpsp_sol.get_end_time(last_activity)
if list_resources is None:
list_resources = rcpsp_problem.resources_list
consumptions = np.zeros((len(list_resources), makespan + 1))
for act_id in rcpsp_problem.get_tasks_list():
for ir in range(len(list_resources)):
use_ir = rcpsp_problem.mode_details[act_id][modes_dict[act_id]].get(
list_resources[ir], 0
)
for s, e in zip(
rcpsp_sol.get_start_times_list(act_id),
rcpsp_sol.get_end_times_list(act_id),
):
if future_view:
consumptions[ir, s + 1 : e + 1] += use_ir
else:
consumptions[ir, s:e] += use_ir
return consumptions, np.arange(0, makespan + 1, 1)
[docs]
def compute_nice_resource_consumption(
rcpsp_problem: PreemptiveRcpspProblem,
rcpsp_sol: PreemptiveRcpspSolution,
list_resources: list[Union[int, str]] = None,
):
if list_resources is None:
list_resources = rcpsp_problem.resources_list
c_future, times = compute_resource_consumption(
rcpsp_problem, rcpsp_sol, list_resources=list_resources, future_view=True
)
c_past, times = compute_resource_consumption(
rcpsp_problem, rcpsp_sol, list_resources=list_resources, future_view=False
)
merged_times = {i: [] for i in range(len(list_resources))}
merged_cons = {i: [] for i in range(len(list_resources))}
for r in range(len(list_resources)):
for index_t in range(len(times)):
merged_times[r] += [times[index_t], times[index_t]]
merged_cons[r] += [c_future[r, index_t], c_past[r, index_t]]
for r in merged_times:
merged_times[r] = np.array(merged_times[r])
merged_cons[r] = np.array(merged_cons[r])
return merged_times, merged_cons
[docs]
def plot_ressource_view(
rcpsp_problem: PreemptiveRcpspProblem,
rcpsp_sol: PreemptiveRcpspSolution,
list_resource: list[Union[int, str]] = None,
title_figure="",
x_lim=None,
fig=None,
ax=None,
):
modes_dict = rcpsp_problem.get_modes_dict(rcpsp_sol)
with_calendar = rcpsp_problem.is_varying_resource()
if list_resource is None:
list_resource = rcpsp_problem.resources_list
if ax is None:
fig, ax = plt.subplots(nrows=len(list_resource), figsize=(10, 5), sharex=True)
if len(list_resource) == 1:
ax = [ax]
fig.suptitle(title_figure)
polygons_ax = {i: [] for i in range(len(list_resource))}
labels_ax = {i: [] for i in range(len(list_resource))}
sorted_activities = sorted(
rcpsp_problem.get_tasks_list(), key=lambda x: rcpsp_sol.get_start_time(x)
)
for j in sorted_activities:
time_starts = rcpsp_sol.get_start_times_list(j) # rcpsp_schedule[j]["starts"]
time_ends = rcpsp_sol.get_end_times_list(j) # rcpsp_schedule[j]["ends"]
index = 0
for time_start, time_end in zip(time_starts, time_ends):
for i in range(len(list_resource)):
cons = rcpsp_problem.mode_details[j][modes_dict[j]].get(
list_resource[i], 0
)
if cons == 0:
continue
bound = (
rcpsp_problem.get_resource_available(list_resource[i], 0)
if not with_calendar
else max(
rcpsp_problem.get_resource_availability_array(list_resource[i])
)
)
for k in range(0, bound + 5):
polygon = Polygon(
[
(time_start, k),
(time_end, k),
(time_end, k + cons),
(time_start, k + cons),
(time_start, k),
]
)
areas = [p.intersection(polygon).area for p in polygons_ax[i]]
if len(areas) == 0 or max(areas) == 0:
polygons_ax[i].append(polygon)
labels_ax[i].append(j)
ax[i].annotate(
j,
xy=((time_start + time_end) / 2, (k + cons / 2)),
font_properties=FontProperties(size=6, weight="bold"),
verticalalignment="center",
horizontalalignment="left",
color="k",
clip_on=True,
)
break
index += 1
for i in range(len(list_resource)):
patches = []
for polygon in polygons_ax[i]:
x, y = polygon.exterior.xy
ax[i].plot(x, y, zorder=-1, color="b")
patches.append(pp(xy=polygon.exterior.coords))
p = PatchCollection(patches, cmap=get_cmap("Blues"), alpha=0.4)
ax[i].add_collection(p)
merged_times, merged_cons = compute_nice_resource_consumption(
rcpsp_problem, rcpsp_sol, list_resources=list_resource
)
for i in range(len(list_resource)):
ax[i].plot(
merged_times[i],
merged_cons[i],
color="r",
linewidth=2,
label="Consumption " + str(list_resource[i]),
zorder=1,
)
if not with_calendar:
ax[i].axhline(
y=rcpsp_problem.get_resource_available(list_resource[i], 0),
linestyle="--",
label="Limit : " + str(list_resource[i]),
zorder=0,
)
else:
ax[i].plot(
merged_times[i],
[
rcpsp_problem.get_resource_available(list_resource[i], m)
for m in merged_times[i]
],
linestyle="--",
label="Limit : " + str(list_resource[i]),
zorder=0,
)
ax[i].legend(fontsize=5)
lims = ax[i].get_xlim()
if x_lim is None:
ax[i].set_xlim([lims[0], 1.0 * lims[1]])
else:
ax[i].set_xlim(x_lim)
ax[-1].set_xlabel("Timestep")
return fig
[docs]
def plot_task_gantt(
rcpsp_problem: PreemptiveRcpspProblem,
rcpsp_sol: PreemptiveRcpspSolution,
fig=None,
ax=None,
x_lim=None,
title=None,
current_t=None,
):
if fig is None or ax is None:
fig, ax = plt.subplots(1, figsize=(7, 7))
ax.set_title("Gantt Task")
if title is None:
ax.set_title("Gantt Task")
else:
ax.set_title(title)
tasks = rcpsp_problem.tasks_list
nb_task = len(tasks)
sorted_task_by_start = sorted(
rcpsp_sol.rcpsp_schedule,
key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["starts"][0]
+ rcpsp_problem.index_task[x],
)
sorted_task_by_end = sorted(
rcpsp_sol.rcpsp_schedule,
key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["ends"][-1]
+ rcpsp_problem.index_task[x],
)
max_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_end[-1]]["ends"][-1]
min_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_start[0]]["starts"][0]
patches = []
for j in range(nb_task):
nb_colors = len(tasks) // 2
colors = get_cmap_with_nb_colors("hsv", nb_colors)
for start, end in zip(
rcpsp_sol.rcpsp_schedule[tasks[j]]["starts"],
rcpsp_sol.rcpsp_schedule[tasks[j]]["ends"],
):
box = [
(j - 0.25, start),
(j - 0.25, end),
(j + 0.25, end),
(j + 0.25, start),
(j - 0.25, start),
]
polygon = Polygon([(b[1], b[0]) for b in box])
x, y = polygon.exterior.xy
ax.plot(x, y, zorder=-1, color="b")
patches.append(
pp(xy=polygon.exterior.coords, facecolor=colors((j - 1) % nb_colors))
)
ax.annotate(
tasks[j],
xy=(
(
rcpsp_sol.rcpsp_schedule[tasks[j]]["starts"][0]
+ rcpsp_sol.rcpsp_schedule[tasks[j]]["ends"][0]
)
/ 2,
j,
),
font_properties=FontProperties(size=7, weight="bold"),
verticalalignment="center",
horizontalalignment="left",
color="k",
clip_on=True,
)
p = PatchCollection(patches, match_original=True, alpha=0.4)
ax.add_collection(p)
if x_lim is None:
ax.set_xlim((min_time, max_time))
else:
ax.set_xlim(x_lim)
ax.set_ylim((-0.5, nb_task))
ax.set_yticks(range(nb_task))
ax.set_yticklabels(
tuple([str(tasks[j]) for j in range(nb_task)]), fontdict={"size": 5}
)
ax.set_ylabel("Task number")
ax.set_xlabel("Timestep")
return fig
[docs]
def compute_schedule_per_resource_individual(
rcpsp_problem: PreemptiveRcpspProblem,
rcpsp_sol: PreemptiveRcpspSolution,
resource_types_to_consider: list[str] = None,
):
modes_dict = rcpsp_problem.build_mode_dict(
rcpsp_modes_from_solution=rcpsp_sol.rcpsp_modes
)
if resource_types_to_consider is None:
resources = rcpsp_problem.resources_list
else:
resources = resource_types_to_consider
sorted_task_by_start = sorted(
rcpsp_sol.rcpsp_schedule,
key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["starts"][0]
+ rcpsp_problem.index_task[x],
)
sorted_task_by_end = sorted(
rcpsp_sol.rcpsp_schedule,
key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["ends"][-1]
+ rcpsp_problem.index_task[x],
)
max_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_end[-1]]["ends"][-1]
min_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_start[0]]["starts"][0]
with_calendar = rcpsp_problem.is_varying_resource()
array_ressource_usage = {
resources[i]: {
"activity": np.zeros(
(
max_time - min_time + 1,
max(rcpsp_problem.resources[resources[i]])
if with_calendar
else rcpsp_problem.resources[resources[i]],
)
),
"binary_activity": np.zeros(
(
max_time - min_time + 1,
max(rcpsp_problem.resources[resources[i]])
if with_calendar
else rcpsp_problem.resources[resources[i]],
)
),
"total_activity": np.zeros(
max(rcpsp_problem.resources[resources[i]])
if with_calendar
else rcpsp_problem.resources[resources[i]]
),
"activity_last_n_hours": np.zeros(
(
max_time - min_time + 1,
max(rcpsp_problem.resources[resources[i]])
if with_calendar
else rcpsp_problem.resources[resources[i]],
)
),
"boxes_time": [],
}
for i in range(len(resources))
}
total_time = max_time - min_time + 1
nhour = int(min(8, total_time / 2 - 1))
index_to_time = {i: min_time + i for i in range(max_time - min_time + 1)}
time_to_index = {index_to_time[i]: i for i in index_to_time}
for activity in sorted_task_by_start:
mode = modes_dict[activity]
start_times = rcpsp_sol.rcpsp_schedule[activity]["starts"]
end_times = rcpsp_sol.rcpsp_schedule[activity]["ends"]
for start_time, end_time in zip(start_times, end_times):
if end_time == start_time:
continue
resources_needed = {
r: rcpsp_problem.mode_details[activity][mode].get(r, 0)
for r in resources
}
for r in resources_needed:
if r not in array_ressource_usage:
continue
rneeded = resources_needed[r]
if not with_calendar:
range_interest = range(
array_ressource_usage[r]["activity"].shape[1]
)
else:
range_interest = range(
rcpsp_problem.resources[r][time_to_index[start_time]]
)
while rneeded > 0:
availables_people_r = [
i
for i in range_interest
if array_ressource_usage[r]["activity"][
time_to_index[start_time], i
]
== 0
]
logger.debug(f"{len(availables_people_r)} people available : ")
if len(availables_people_r) > 0:
resource = min(
availables_people_r,
key=lambda x: array_ressource_usage[r]["total_activity"][x],
)
# greedy choice,
# the one who worked the less until now.
array_ressource_usage[r]["activity"][
time_to_index[start_time] : time_to_index[end_time],
resource,
] = (
rcpsp_problem.index_task[activity]
if isinstance(activity, str)
else activity
)
array_ressource_usage[r]["binary_activity"][
time_to_index[start_time] : time_to_index[end_time],
resource,
] = 1
array_ressource_usage[r]["total_activity"][resource] += (
end_time - start_time
)
array_ressource_usage[r]["activity_last_n_hours"][
:, resource
] = np.convolve(
array_ressource_usage[r]["binary_activity"][:, resource],
np.array([1] * nhour + [0] + [0] * nhour),
mode="same",
)
array_ressource_usage[r]["boxes_time"] += [
[
(
resource - 0.25,
start_time + 0.01,
rcpsp_problem.index_task[activity],
),
(
resource - 0.25,
end_time - 0.01,
rcpsp_problem.index_task[activity],
),
(
resource + 0.25,
end_time - 0.01,
rcpsp_problem.index_task[activity],
),
(
resource + 0.25,
start_time + 0.01,
rcpsp_problem.index_task[activity],
),
(
resource - 0.25,
start_time + 0.01,
rcpsp_problem.index_task[activity],
),
]
]
# for plot purposes.
rneeded -= 1
else:
logger.debug(f"r_needed {rneeded}")
logger.debug(f"Ressource needed : {resources_needed}")
logger.debug(f"ressource : {r}")
logger.debug(f"activity : {activity}")
logger.warning("Problem, can't build schedule")
logger.debug(array_ressource_usage[r]["activity"])
rneeded = 0
return array_ressource_usage
[docs]
def plot_resource_individual_gantt(
rcpsp_problem: PreemptiveRcpspProblem,
rcpsp_sol: PreemptiveRcpspSolution,
resource_types_to_consider: list[str] = None,
title_figure="",
x_lim=None,
fig=None,
ax=None,
current_t=None,
):
array_ressource_usage = compute_schedule_per_resource_individual(
rcpsp_problem, rcpsp_sol, resource_types_to_consider=resource_types_to_consider
)
sorted_task_by_start = sorted(
rcpsp_sol.rcpsp_schedule,
key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["starts"][0]
+ rcpsp_problem.index_task[x],
)
sorted_task_by_end = sorted(
rcpsp_sol.rcpsp_schedule,
key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["ends"][-1]
+ rcpsp_problem.index_task[x],
)
max_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_end[-1]]["ends"][-1]
min_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_start[0]]["starts"][0]
resources_list = list(array_ressource_usage.keys())
if fig is None or ax is None:
fig, ax = plt.subplots(len(array_ressource_usage), figsize=(10, 5))
fig.suptitle(title_figure)
if len(array_ressource_usage) == 1:
ax = [ax]
for i in range(len(resources_list)):
patches = []
nb_colors = len(sorted_task_by_start) // 2
colors = get_cmap_with_nb_colors("hsv", nb_colors)
for boxe in array_ressource_usage[resources_list[i]]["boxes_time"]:
polygon = Polygon([(b[1], b[0]) for b in boxe])
activity = boxe[0][2]
x, y = polygon.exterior.xy
ax[i].plot(x, y, zorder=-1, color="b")
patches.append(
pp(
xy=polygon.exterior.coords,
facecolor=colors((activity - 1) % nb_colors),
)
)
p = PatchCollection(
patches,
match_original=True,
alpha=0.4,
)
ax[i].add_collection(p)
ax[i].set_title(resources_list[i])
if x_lim is None:
ax[i].set_xlim((min_time, max_time))
else:
ax[i].set_xlim(x_lim)
try:
ax[i].set_ylim((-0.5, rcpsp_problem.resources[resources_list[i]]))
ax[i].set_yticks(range(rcpsp_problem.resources[resources_list[i]]))
ax[i].set_yticklabels(
tuple([j for j in range(rcpsp_problem.resources[resources_list[i]])]),
fontdict={"size": 7},
)
except:
m = max(rcpsp_problem.resources[resources_list[i]])
ax[i].set_ylim((-0.5, m))
ax[i].set_yticks(range(m))
ax[i].set_yticklabels(tuple([j for j in range(m)]), fontdict={"size": 7})
ax[i].grid(True)
if current_t is not None:
ax[i].axvline(x=current_t, label="pyplot vertical line", color="r", ls="--")
ax[-1].set_xlabel("Timestep")
return fig