# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import time
from collections.abc import Callable
from itertools import product
from typing import Any, Optional
from ortools.math_opt.python import mathopt
from discrete_optimization.generic_tools.do_problem import ParamsObjectiveFunction
from discrete_optimization.generic_tools.lp_tools import (
OrtoolsMathOptMilpSolver,
ParametersMilp,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
)
from discrete_optimization.rcpsp_multiskill.problem import (
MultiskillRcpspProblem,
MultiskillRcpspSolution,
tree,
)
logger = logging.getLogger(__name__)
[docs]
class MathOptMultiskillRcpspSolver(OrtoolsMathOptMilpSolver):
problem: MultiskillRcpspProblem
def __init__(
self,
problem: MultiskillRcpspProblem,
params_objective_function: ParamsObjectiveFunction = None,
**kwargs,
):
super().__init__(
problem=problem,
params_objective_function=params_objective_function,
**kwargs,
)
self.variable_decision = {}
self.constraints_dict = {"lns": []}
[docs]
def init_model(self, **args):
self.model = self.create_empty_model(name="mrcpsp")
sorted_tasks = self.problem.tasks_list
max_time = args.get("max_time", self.problem.horizon)
max_duration = max_time
renewable = {
r: self.problem.resources_availability[r]
for r in self.problem.resources_availability
if r not in self.problem.non_renewable_resources
}
non_renewable = {
r: self.problem.resources_availability[r]
for r in self.problem.non_renewable_resources
}
list_edges = []
for task in sorted_tasks:
for suc in self.problem.successors[task]:
list_edges.append([task, suc])
times = range(max_duration)
self.modes = {
task: {
mode: self.add_binary_variable(name=f"mode_{task},{mode}")
for mode in self.problem.mode_details[task]
}
for task in self.problem.mode_details
}
self.start_times = {
task: {
mode: {
t: self.add_binary_variable(name=f"start_{task},{mode},{t}")
for t in times
}
for mode in self.problem.mode_details[task]
}
for task in self.problem.mode_details
}
# you have to choose one starting date :
for task in self.start_times:
self.add_linear_constraint(
self.construct_linear_sum(
self.start_times[task][mode][t]
for mode in self.start_times[task]
for t in self.start_times[task][mode]
)
== 1
)
for mode in self.modes[task]:
self.add_linear_constraint(
self.modes[task][mode]
== self.construct_linear_sum(
self.start_times[task][mode][t]
for t in self.start_times[task][mode]
)
)
self.durations = {
task: self.add_integer_variable(name="duration_" + str(task))
for task in self.modes
}
self.start_times_task = {
task: self.add_integer_variable(name=f"start_time_{task}")
for task in self.start_times
}
self.end_times_task = {
task: self.add_integer_variable(name=f"end_time_{task}")
for task in self.start_times
}
for task in self.start_times:
self.add_linear_constraint(
self.construct_linear_sum(
self.start_times[task][mode][t] * t
for mode in self.start_times[task]
for t in self.start_times[task][mode]
)
== self.start_times_task[task]
)
self.add_linear_constraint(
self.end_times_task[task]
- self.start_times_task[task]
- self.durations[task]
== 0
)
for task in self.durations:
self.add_linear_constraint(
self.construct_linear_sum(
self.problem.mode_details[task][mode]["duration"]
* self.modes[task][mode]
for mode in self.modes[task]
)
== self.durations[task]
)
self.employee_usage = tree()
task_in_employee_usage = set()
for employee in self.problem.employees:
skills_employee = [
skill
for skill in self.problem.employees[employee].dict_skill.keys()
if self.problem.employees[employee].dict_skill[skill].skill_value > 0
]
for task in sorted_tasks:
for mode in self.problem.mode_details[task]:
required_skills = [
s
for s in self.problem.mode_details[task][mode]
if s in self.problem.skills_set
and self.problem.mode_details[task][mode][s] > 0
and s in skills_employee
]
if len(required_skills) == 0:
# this employee will be useless anyway, pass
continue
for s in required_skills:
for t in range(max_duration):
self.employee_usage[
(employee, task, mode, t, s)
] = self.add_binary_variable(
name=f"employee_{employee}{task}{mode}{t}{s}",
)
task_in_employee_usage.add(task)
self.add_linear_constraint(
self.employee_usage[(employee, task, mode, t, s)]
- self.modes[task][mode]
<= 0
)
self.add_linear_constraint(
self.employee_usage[(employee, task, mode, t, s)]
- self.start_times[task][mode][t]
<= 0
)
len_calendar = len(
self.problem.employees[employee].calendar_employee
)
if any(
not self.problem.employees[employee].calendar_employee[
tt
]
for tt in range(
t,
min(
t
+ self.problem.mode_details[task][mode][
"duration"
],
len_calendar,
),
)
):
self.add_linear_constraint(
self.employee_usage[(employee, task, mode, t, s)]
== 0
)
employees = set([x[0] for x in self.employee_usage])
# can't work on overlapping tasks.
for emp, t in product(employees, times):
self.add_linear_constraint(
self.construct_linear_sum(
self.employee_usage[x]
for x in self.employee_usage
if x[0] == emp
and x[3]
<= t
< x[3] + int(self.problem.mode_details[x[1]][x[2]]["duration"])
)
<= 1
)
# ressource usage limit
for (r, t) in product(renewable, times):
self.add_linear_constraint(
self.construct_linear_sum(
int(self.problem.mode_details[task][mode][r])
* self.start_times[task][mode][time]
for task in self.start_times
for mode in self.start_times[task]
for time in self.start_times[task][mode]
if time
<= t
< time + int(self.problem.mode_details[task][mode]["duration"])
)
<= renewable[r][t]
)
# for non renewable ones.
for r in non_renewable:
self.add_linear_constraint(
self.construct_linear_sum(
int(self.problem.mode_details[task][mode][r])
* self.start_times[task][mode][time]
for task in self.start_times
for mode in self.start_times[task]
for time in self.start_times[task][mode]
)
<= non_renewable[r][0]
)
for task in self.start_times_task:
required_skills = [
(s, mode, self.problem.mode_details[task][mode][s])
for mode in self.problem.mode_details[task]
for s in self.problem.mode_details[task][mode]
if s in self.problem.skills_set
and self.problem.mode_details[task][mode][s] > 0
]
skills = set([s[0] for s in required_skills])
for s in skills:
employee_usage_keys = [
v for v in self.employee_usage if v[1] == task and v[4] == s
]
self.add_linear_constraint(
self.construct_linear_sum(
self.employee_usage[x]
* self.problem.employees[x[0]].dict_skill[s].skill_value
for x in employee_usage_keys
)
>= self.construct_linear_sum(
self.modes[task][mode]
* self.problem.mode_details[task][mode].get(s, 0)
for mode in self.modes[task]
)
)
for (j, s) in list_edges:
self.add_linear_constraint(
self.start_times_task[s] - self.end_times_task[j] >= 0
)
self.set_model_objective(
self.start_times_task[max(self.start_times_task)],
minimize=True,
)
[docs]
def retrieve_current_solution(
self,
get_var_value_for_current_solution: Callable[[Any], float],
get_obj_value_for_current_solution: Callable[[], float],
) -> MultiskillRcpspSolution:
rcpsp_schedule = {}
modes = {}
results = {}
employee_usage = {}
employee_usage_solution = {}
for task in self.start_times:
for mode in self.start_times[task]:
for t, start_time in self.start_times[task][mode].items():
value = get_var_value_for_current_solution(start_time)
results[(task, mode, t)] = value
if value >= 0.5:
rcpsp_schedule[task] = {
"start_time": int(t),
"end_time": int(
t + self.problem.mode_details[task][mode]["duration"]
),
}
modes[task] = mode
for t in self.employee_usage:
employee_usage[t] = get_var_value_for_current_solution(
self.employee_usage[t]
)
if employee_usage[t] >= 0.5:
if t[1] not in employee_usage_solution:
employee_usage_solution[t[1]] = {}
if t[0] not in employee_usage_solution[t[1]]:
employee_usage_solution[t[1]][t[0]] = set()
employee_usage_solution[t[1]][t[0]].add(t[4])
# (employee, task, mode, time, skill)
modes = {}
modes_task = {}
for t in self.modes:
for m, mode in self.modes[t].items():
modes[(t, m)] = get_var_value_for_current_solution(mode)
if modes[(t, m)] >= 0.5:
modes_task[t] = m
durations = {}
for t in self.durations:
durations[t] = get_var_value_for_current_solution(self.durations[t])
start_time = {}
for t in self.start_times_task:
start_time[t] = get_var_value_for_current_solution(self.start_times_task[t])
end_time = {}
for t in self.start_times_task:
end_time[t] = get_var_value_for_current_solution(self.end_times_task[t])
logger.debug(f"Size schedule : {len(rcpsp_schedule.keys())}")
logger.debug(
(
"results",
"(task, mode, time)",
{x: results[x] for x in results if results[x] == 1.0},
)
)
logger.debug(
(
"Employee usage : ",
"(employee, task, mode, time, skill)",
{
x: employee_usage[x]
for x in employee_usage
if employee_usage[x] == 1.0
},
)
)
logger.debug(
(
"task mode : ",
"(task, mode)",
{t: modes[t] for t in modes if modes[t] == 1.0},
)
)
logger.debug(f"durations : {durations}")
logger.debug(f"Start time {start_time}")
logger.debug(f"End time {end_time}")
return MultiskillRcpspSolution(
problem=self.problem,
modes=modes_task,
schedule=rcpsp_schedule,
employee_usage=employee_usage_solution,
)
[docs]
def solve(
self,
parameters_milp: Optional[ParametersMilp] = None,
time_limit: Optional[float] = 30.0,
**args,
) -> ResultStorage:
if self.model is None:
logger.info("Init LP model ")
t = time.time()
self.init_model(greedy_start=False, **args)
logger.info(f"LP model initialized...in {time.time() - t} seconds")
return super().solve(
parameters_milp=parameters_milp, time_limit=time_limit, **args
)
[docs]
def convert_to_variable_values(
self, solution: MultiskillRcpspSolution
) -> dict[mathopt.Variable, int]:
hinted_value: dict[mathopt.Variable, int] = {}
# we define the hint only for task present in solution schedule
# (potentially only partially defined if infeasible for example)
for task in solution.schedule:
mode = solution.modes[task]
start_time = solution.schedule[task]["start_time"]
end_time = solution.schedule[task]["end_time"]
duration = end_time - start_time
for other_mode, var in self.modes[task].items():
if other_mode == mode:
hinted_value[var] = 1
else:
hinted_value[var] = 0
for other_mode, vars in self.start_times[task].items():
if other_mode == mode:
for t, var in vars.items():
if t == start_time:
hinted_value[var] = 1
else:
hinted_value[var] = 0
else:
for t, var in vars.items():
hinted_value[var] = 0
hinted_value[self.start_times_task[task]] = start_time
hinted_value[self.end_times_task[task]] = end_time
hinted_value[self.durations[task]] = duration
return hinted_value