Source code for discrete_optimization.rcpsp_multiskill.solvers.cp_mzn

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
import math
import os
from collections.abc import Hashable
from datetime import timedelta
from enum import Enum
from typing import Any, Optional, Union

from minizinc import Instance, Model, Solver

from discrete_optimization.generic_rcpsp_tools.graph_tools import (
    build_graph_rcpsp_object,
    build_unrelated_task,
)
from discrete_optimization.generic_tools.cp_tools import (
    CpSolverName,
    MinizincCpSolver,
    ParametersCp,
    SignEnum,
    find_right_minizinc_solver_name,
)
from discrete_optimization.generic_tools.do_problem import ParamsObjectiveFunction
from discrete_optimization.rcpsp.problem_preemptive import PreemptiveRcpspSolution
from discrete_optimization.rcpsp.solution import PartialSolution, RcpspSolution
from discrete_optimization.rcpsp_multiskill.problem import (
    MultiskillRcpspProblem,
    MultiskillRcpspSolution,
    PreemptiveMultiskillRcpspSolution,
    cluster_employees_to_resource_types,
    create_fake_tasks_multiskills,
)

logger = logging.getLogger(__name__)
this_path = os.path.dirname(os.path.abspath(__file__))

files_mzn = {
    "multi-calendar": os.path.join(
        this_path, "../minizinc/ms_rcpsp_multi_mode_mzn_calendar.mzn"
    ),
    "multi-calendar-overlap": os.path.join(
        this_path, "../minizinc/ms_rcpsp_with_overlap_variable.mzn"
    ),
    "multi-calendar-no-ressource": os.path.join(
        this_path, "../minizinc/ms_rcpsp_multi_mode_mzn_calendar_no_ressource.mzn"
    ),
    "ms_no_multitasking": os.path.join(
        this_path,
        "../minizinc/ms_rcpsp_multi_mode_mzn_calendar_no_ressource_nomultitasking.mzn",
    ),
    "ms_rcpsp_preemptive": os.path.join(
        this_path, "../minizinc/ms_rcpsp_preemptive.mzn"
    ),
    "ms_rcpsp_partial_preemptive": os.path.join(
        this_path, "../minizinc/ms_rcpsp_preemptive_partially_preemptive.mzn"
    ),
    "compute_worker_for_tasks": os.path.join(
        this_path, "../minizinc/ms_rcpsp_compute_workers_for_tasks.mzn"
    ),
}


[docs] def add_fake_task_cp_data( rcpsp_problem: Union[MultiskillRcpspProblem], ignore_fake_task: bool = True, max_time_to_consider: int = None, ): if not ignore_fake_task: fake_tasks_res, fake_tasks_unit = create_fake_tasks_multiskills( rcpsp_problem=rcpsp_problem ) if len(fake_tasks_res) > 0: max_time_to_consider = ( rcpsp_problem.horizon if max_time_to_consider is None else max_time_to_consider ) fake_tasks_res = [ f for f in fake_tasks_res if f["start"] <= max_time_to_consider ] n_fake_tasks_res = len(fake_tasks_res) fakestart_res = [ fake_tasks_res[i]["start"] for i in range(n_fake_tasks_res) ] fake_dur_res = [ fake_tasks_res[i]["duration"] for i in range(n_fake_tasks_res) ] max_duration_fake_task_res = max(fake_dur_res) fake_req_res = [ [fake_tasks_res[i].get(res, 0) for i in range(n_fake_tasks_res)] for res in rcpsp_problem.resources_list ] dict_to_add_in_instance = { "max_duration_fake_task_resource": max_duration_fake_task_res, "n_fake_task_resource": n_fake_tasks_res, "fakestart_resource": fakestart_res, "fakedur_resource": fake_dur_res, "fakereq_resource": fake_req_res, "include_fake_tasks_resource": True, } else: dict_to_add_in_instance = { "max_duration_fake_task_resource": 0, "n_fake_task_resource": 0, "fakestart_resource": [], "fakedur_resource": [], "fakereq_resource": [[] for req in rcpsp_problem.resources_list], "include_fake_tasks_resource": False, } if len(fake_tasks_unit) > 0: fake_tasks_unit = [ f for f in fake_tasks_unit if f["start"] <= max_time_to_consider ] n_fake_tasks_unit = len(fake_tasks_unit) fakestart_unit = [ fake_tasks_unit[i]["start"] for i in range(n_fake_tasks_unit) ] fake_dur_unit = [ fake_tasks_unit[i]["duration"] for i in range(n_fake_tasks_unit) ] max_duration_fake_task_unit = max(fake_dur_unit) fake_req_unit = [ [fake_tasks_unit[i].get(res, 0) for i in range(n_fake_tasks_unit)] for res in rcpsp_problem.employees_list ] dict_to_add_in_instance[ "max_duration_fake_task_unit" ] = max_duration_fake_task_unit dict_to_add_in_instance["n_fake_task_unit"] = n_fake_tasks_unit dict_to_add_in_instance["fakestart_unit"] = fakestart_unit dict_to_add_in_instance["fakedur_unit"] = fake_dur_unit dict_to_add_in_instance["fakereq_unit"] = fake_req_unit dict_to_add_in_instance["include_fake_tasks_unit"] = True else: dict_to_add_in_instance["max_duration_fake_task_unit"] = 0 dict_to_add_in_instance["n_fake_task_unit"] = 0 dict_to_add_in_instance["fakestart_unit"] = [] dict_to_add_in_instance["fakedur_unit"] = [] dict_to_add_in_instance["fakereq_unit"] = [ [] for e in rcpsp_problem.employees_list ] dict_to_add_in_instance["include_fake_tasks_unit"] = False return dict_to_add_in_instance else: dict_to_add_in_instance = { "max_duration_fake_task_resource": 0, "n_fake_task_resource": 0, "fakestart_resource": [], "fakedur_resource": [], "fakereq_resource": [[] for r in rcpsp_problem.resources_list], "include_fake_tasks_resource": False, "max_duration_fake_task_unit": 0, "n_fake_task_unit": 0, "fakestart_unit": [], "fakedur_unit": [], "fakereq_unit": [[] for e in rcpsp_problem.employees_list], "include_fake_tasks_unit": False, } return dict_to_add_in_instance
def _log_minzinc_result(_output_item: Optional[str] = None, **kwargs: Any) -> None: logger.debug(f"One solution {kwargs['objective']}") if "nb_preemption_subtasks" in kwargs: logger.debug(("nb_preemption_subtasks", kwargs["nb_preemption_subtasks"])) if "nb_small_tasks" in kwargs: logger.debug(("nb_small_tasks", kwargs["nb_small_tasks"])) if "res_load" in kwargs: logger.debug(("res_load ", kwargs["res_load"])) keys = [k for k in kwargs if "penalty" in k] logger.debug("".join([str(k) + " : " + str(kwargs[k]) + "\n" for k in keys])) logger.debug(_output_item)
[docs] class SearchStrategyMultiskillRcpsp(Enum): START_THEN_USED_UNIT = "durThenStartThenMode" PRIORITY_SEARCH_START_UNIT_USED = "priority_smallest" NONE = "none"
[docs] class CpMultiskillRcpspSolver(MinizincCpSolver): problem: MultiskillRcpspProblem def __init__( self, problem: MultiskillRcpspProblem, cp_solver_name: CpSolverName = CpSolverName.CHUFFED, params_objective_function: ParamsObjectiveFunction = None, silent_solve_error: bool = False, **kwargs, ): super().__init__( problem=problem, params_objective_function=params_objective_function ) self.silent_solve_error = silent_solve_error self.cp_solver_name = cp_solver_name self.key_decision_variable = [ "start", "mrun", ] # For now, I've put the var names of the CP model (not the rcpsp_problem) self.one_ressource_per_task = kwargs.get("one_ressource_per_task", False) self.resources_index = None
[docs] def manual_cumulative_resource_constraints(self, instance): blocking_per_resource = { r: {"blocking": [], "merged": set(), "not_blocked": set()} for r in self.problem.resources_list } array = [ [0 for i in range(self.problem.n_jobs)] for r in self.problem.resources_list ] for l in self.problem.resource_blocking_data: set_res = l[-1] list_task = l[0] for res in set_res: blocking_per_resource[res]["blocking"] += list_task blocking_per_resource[res]["merged"].update(list_task) index_res = self.problem.resources_list.index(res) array[index_res][self.problem.index_task[list_task[0]]] = ( self.problem.index_task[list_task[-1]] + 1 ) for i in list_task[1:]: array[index_res][self.problem.index_task[i]] = -1 for r in blocking_per_resource: blocking_per_resource[r]["not_blocked"] = [ t for t in self.problem.tasks_list if t not in blocking_per_resource[r]["merged"] ] s = """array[Res, Act] of int: resource_blocking_array;""" instance.add_string(s) instance["resource_blocking_array"] = array s = """constraint forall(k in RRes)( if include_fake_tasks_resource then let{set of Tasks: TasksR = {i| i in Act where mask_res_task[k, i]=false /\ resource_blocking_array[k, i]==0}, set of Tasks: TaskBlocker = {j| j in Act where resource_blocking_array[k,j]>0}, set of FakeActRes: FTasks = {j | j in FakeActRes where fakereq_resource[k, j]>0}} in( cumulative([start[i]| i in TasksR]++[start[i]| i in TaskBlocker]++[fakestart_resource[p] | p in FTasks], [adur[i]| i in TasksR]++ [max([0, start[resource_blocking_array[k,i]]+adur[resource_blocking_array[k,i]]-start[i]])| i in TaskBlocker]++[fakedur_resource[p] | p in FTasks], [arreq[k,i] | i in TasksR]++[arreq[k,i] | i in TaskBlocker]++[fakereq_resource[k, p] | p in FTasks], rcap[k]) ) else cumulative(start, adur, [arreq[k,i] | i in Act], rcap[k]) endif); """ instance.add_string(s)
[docs] def manual_starting_time(self, instance): slist = [] for i in range(self.problem.nb_tasks): t = self.problem.tasks_list[i] if t in self.problem.special_constraints.start_times_window: st = [] if ( self.problem.special_constraints.start_times_window[t][0] is not None ): st += [self.problem.special_constraints.start_times_window[t][0]] if ( self.problem.special_constraints.start_times_window[t][1] is not None ): st += [self.problem.special_constraints.start_times_window[t][1]] s = ( """constraint (member([start[j]+adur[j]|j in Act]++[fakestart_resource[j]+fakedur_resource[j]| j in FakeActRes]++ [fakestart_unit[j]+fakedur_unit[j]|j in FakeActUnit]++""" + str(st) + """, start[""" + str(i + 1) + """]));\n""" ) else: s = ( """constraint (member([start[j]+adur[j]|j in Act]++[fakestart_resource[j]+fakedur_resource[j]| j in FakeActRes]++ [fakestart_unit[j]+fakedur_unit[j]|j in FakeActUnit], start[""" + str(i + 1) + """]));\n""" ) slist += [s] for s in slist: instance.add_string(s)
[docs] def write_search_strategy_chuffed(self, instance): s = """ann: priority_smallest = priority_search(start, [seq_search([int_search([start[a]], input_order, indomain_min), priority_search([sum(sk in Skill)( skillunits[w, sk] * array_skills_required[ sk, a]) | w in Units], [bool_search([unit_used[w, a]], input_order, indomain_min) | w in Units], smallest, complete) ]) | a in Act], smallest, complete);\n""" instance.add_string(s) if self.cp_solver_name == CpSolverName.CHUFFED: instance.add_string("""include "chuffed.mzn";\n""")
[docs] def constraint_task_to_mode(self, task_id, mode): modes = self.problem.mode_details[task_id].keys() list_strings = [] for m in modes: if m == mode: bool_str = "true" else: bool_str = "false" s = ( "constraint mrun[" + str(self.mode_dict_task_mode_to_index_minizinc[(task_id, m)]) + "]=" + bool_str + ";\n" ) list_strings += [s] return list_strings
[docs] def constraint_start_time_string( self, task, start_time, sign: SignEnum = SignEnum.EQUAL ) -> str: return ( "constraint start[" + str(self.index_in_minizinc[task]) + "]" + str(sign.value) + str(start_time) + ";\n" )
[docs] def constraint_end_time_string( self, task, end_time, sign: SignEnum = SignEnum.EQUAL ) -> str: return ( "constraint start[" + str(self.index_in_minizinc[task]) + "]+adur[" + str(self.index_in_minizinc[task]) + "]" + str(sign.value) + str(end_time) + ";\n" )
[docs] def constraint_used_employee(self, task, employee, indicator: bool = False): id_task = self.index_in_minizinc[task] id_employee = self.index_employees_in_minizinc[employee] tag = "true" if indicator else "false" return [ "constraint unit_used[" + str(id_employee) + "," + str(id_task) + "]==" + str(tag) + ";\n" ]
[docs] def constraint_objective_max_time_set_of_jobs(self, set_of_jobs): s = [] for j in set_of_jobs: s += [ "constraint s[" + str(self.index_in_minizinc[j]) + "]+adur[" + str(self.index_in_minizinc[j]) + "]<=objective;\n" ] return s
[docs] def constraint_objective_equal_makespan(self, task_sink): ind = self.index_in_minizinc[task_sink] s = "constraint s[" + str(ind) + "]==objective;\n" return [s]
[docs] def constraint_sum_of_ending_time(self, set_subtasks: set[Hashable]): indexes = [self.index_in_minizinc[s] for s in set_subtasks] s = ( """int: nb_indexes=""" + str(len(indexes)) + """;\n array[1..nb_indexes] of Tasks: index_tasks=""" + str(indexes) + """;\n constraint objective>=sum(j in index_tasks)(start[j]+adur[j]);\n""" ) return [s]
[docs] def constraint_sum_of_starting_time(self, set_subtasks: set[Hashable]): indexes = [self.index_in_minizinc[s] for s in set_subtasks] s = ( """int: nb_indexes=""" + str(len(indexes)) + """;\n array[1..nb_indexes] of Tasks: index_tasks=""" + str(indexes) + """;\n constraint objective>=sum(j in index_tasks)(start[j]);\n""" ) return [s]
[docs] def add_hard_special_constraints(self, partial_solution): return add_hard_special_constraints(partial_solution, self)
[docs] def init_model(self, **args): no_ressource = False model_type = args.get( "model_type", "multi-calendar" if not no_ressource else "multi-calendar-no-ressource", ) model = Model(files_mzn[model_type]) exact_skills_need = args.get("exact_skills_need", False) add_objective_makespan = args.get("add_objective_makespan", True) ignore_sec_objective = args.get("ignore_sec_objective", True) include_cumulative_resource = args.get("include_cumulative_resource", True) include_constraint_on_start_value = args.get( "include_constraint_on_start_value", False ) include_constraint_on_start_value_manual = args.get( "include_constraint_on_start_value_manual", False ) search_strategy = args.get( "search_strategy", SearchStrategyMultiskillRcpsp.START_THEN_USED_UNIT ) if include_constraint_on_start_value_manual: include_constraint_on_start_value = False fake_tasks = args.get( "fake_tasks", True ) # to modelize varying quantity of resource. solver = Solver.lookup(find_right_minizinc_solver_name(self.cp_solver_name)) resources_list = self.problem.resources_list self.resources_index = resources_list instance = Instance(solver, model) if self.cp_solver_name.value == CpSolverName.CHUFFED.value: self.write_search_strategy_chuffed(instance) if include_constraint_on_start_value_manual: self.manual_starting_time(instance) if search_strategy.value != "none": instance.add_string("my_search=" + str(search_strategy.value) + ";\n") else: instance.add_string( "my_search=" + str(SearchStrategyMultiskillRcpsp.START_THEN_USED_UNIT.value) + ";\n" ) # we have to put one by default. instance["add_objective_makespan"] = add_objective_makespan instance["ignore_sec_objective"] = ignore_sec_objective instance["include_cumulative_resource"] = include_cumulative_resource instance[ "include_constraint_on_start_value" ] = include_constraint_on_start_value n_res = len(resources_list) keys = [] if not no_ressource: instance["n_res"] = n_res keys += ["n_res"] if model_type in {"multi-calendar", "multi-calendar-overlap"}: instance["exact_skills_need"] = exact_skills_need keys += ["exact_skills_need"] if model_type == "multi-calendar-overlap": self.graph = build_graph_rcpsp_object(rcpsp_problem=self.problem) _, unrelated = build_unrelated_task(self.graph) instance["nUnrel"] = len(unrelated) instance["unpred"] = [self.problem.index_task[x[0]] + 1 for x in unrelated] instance["unsucc"] = [self.problem.index_task[x[1]] + 1 for x in unrelated] if ( not include_cumulative_resource and len(self.problem.resource_blocking_data) > 0 ): self.manual_cumulative_resource_constraints(instance) instance["one_ressource_per_task"] = self.one_ressource_per_task keys += ["one_ressource_per_task"] n_tasks = self.problem.n_jobs instance["n_tasks"] = n_tasks keys += ["n_tasks"] sorted_tasks = self.problem.tasks_list n_opt = sum( [len(list(self.problem.mode_details[key].keys())) for key in sorted_tasks] ) instance["n_opt"] = n_opt keys += ["n_opt"] all_modes = [ (act, mode, self.problem.mode_details[act][mode]) for act in sorted_tasks for mode in sorted(self.problem.mode_details[act]) ] self.modeindex_map = { i + 1: {"task": all_modes[i][0], "original_mode_index": all_modes[i][1]} for i in range(len(all_modes)) } modes = [set() for t in sorted_tasks] for j in self.modeindex_map: modes[self.problem.index_task[self.modeindex_map[j]["task"]]].add(j) self.mode_dict_task_mode_to_index_minizinc = {} for ind in self.modeindex_map: task = self.modeindex_map[ind]["task"] mode = self.modeindex_map[ind]["original_mode_index"] self.mode_dict_task_mode_to_index_minizinc[(task, mode)] = ind dur = [x[2]["duration"] for x in all_modes] instance["modes"] = modes keys += ["modes"] instance["dur"] = dur keys += ["dur"] skills_set = sorted(list(self.problem.skills_set)) nb_units = len(self.problem.employees) skill_required = [ [int(all_modes[i][2].get(s, 0)) for i in range(len(all_modes))] for s in skills_set ] if "max_time" in args: instance["max_time"] = args["max_time"] else: instance["max_time"] = self.problem.horizon keys += ["max_time"] dict_to_add = add_fake_task_cp_data( rcpsp_problem=self.problem, ignore_fake_task=not fake_tasks, max_time_to_consider=instance["max_time"], ) for key in dict_to_add: instance[key] = dict_to_add[key] keys += [key] instance["nb_skill"] = len(self.problem.skills_set) instance["skillreq"] = skill_required instance["nb_units"] = nb_units keys += ["nb_skill", "skillreq", "nb_units"] instance["source"] = self.problem.index_task[self.problem.source_task] + 1 skillunits = [ [ int(math.floor(self.problem.employees[j].dict_skill[s].skill_value)) if s in self.problem.employees[j].dict_skill else 0 for s in skills_set ] for j in self.problem.employees_list ] self.employees_position = self.problem.employees_list self.index_employees_in_minizinc = { self.problem.employees_list[i]: i + 1 for i in range(len(self.problem.employees_list)) } instance["skillunits"] = skillunits keys += ["skillunits"] logger.debug(f"Employee position CP {self.employees_position}") if not no_ressource: rreq = [ [all_modes[i][2].get(res, 0) for i in range(len(all_modes))] for res in resources_list ] instance["rreq"] = rreq keys += ["rreq"] instance["rreq"] = rreq keys += ["rreq"] rcap = [ int(max(self.problem.resources_availability[x])) for x in resources_list ] instance["rcap"] = rcap keys += ["rcap"] rtype = [ 2 if res in self.problem.non_renewable_resources else 1 for res in resources_list ] instance["rtype"] = rtype keys += ["rtype"] succ = [ set( [ self.problem.return_index_task(x, offset=1) for x in self.problem.successors.get(task, []) ] ) for task in sorted_tasks ] instance["succ"] = succ keys += ["succ"] self.instance = instance p_s: Optional[PartialSolution] = args.get("partial_solution", None) self.index_in_minizinc = { task: self.problem.return_index_task(task, offset=1) for task in self.problem.tasks_list } add_partial_solution_hard_constraint = args.get( "add_partial_solution_hard_constraint", True ) if add_partial_solution_hard_constraint: strings = add_hard_special_constraints(p_s, self) for s in strings: self.instance.add_string(s) else: strings, name_penalty = add_soft_special_constraints(p_s, self) for s in strings: self.instance.add_string(s) strings = define_second_part_objective( [100] * len(name_penalty), name_penalty ) if len(name_penalty) > 0: for s in strings: self.instance.add_string(s)
[docs] def retrieve_solution( self, _output_item: Optional[str] = None, **kwargs: Any ) -> MultiskillRcpspSolution: """Return a d-o solution from the variables computed by minizinc. Args: _output_item: string representing the minizinc solver output passed by minizinc to the solution constructor **kwargs: keyword arguments passed by minzinc to the solution contructor containing the objective value (key "objective"), and the computed variables as defined in minizinc model. Returns: """ _log_minzinc_result(_output_item=_output_item, **kwargs) start_times = kwargs["start"] mrun = kwargs["mrun"] unit_used = kwargs["unit_used"] skills_list = sorted(list(self.problem.skills_set)) usage = {} modes_dict = {} for i in range(len(mrun)): if mrun[i]: modes_dict[self.modeindex_map[i + 1]["task"]] = self.modeindex_map[ i + 1 ]["original_mode_index"] for w in range(len(unit_used)): for task in range(len(unit_used[w])): task_id = self.problem.tasks_list[task] if unit_used[w][task] == 1: if "contrib" in kwargs: # model="ms_no_multitasking" intersection = [ skills_list[i] for i in range(len(kwargs["contrib"][task][w])) if kwargs["contrib"][task][w][i] == 1 ] else: mode = modes_dict[task_id] skills_needed = set( [ s for s in self.problem.skills_set if s in self.problem.mode_details[task_id][mode] and self.problem.mode_details[task_id][mode][s] > 0 ] ) skills_worker = set( [ s for s in self.problem.employees[ self.employees_position[w] ].dict_skill if self.problem.employees[self.employees_position[w]] .dict_skill[s] .skill_value > 0 ] ) intersection = skills_needed.intersection(skills_worker) if len(intersection) > 0: if task_id not in usage: usage[task_id] = {} usage[task_id][self.employees_position[w]] = intersection rcpsp_schedule = {} for i in range(len(start_times)): task_id = self.problem.tasks_list[i] rcpsp_schedule[task_id] = { "start_time": start_times[i], "end_time": start_times[i] + self.problem.mode_details[task_id][modes_dict[task_id]]["duration"], } return MultiskillRcpspSolution( problem=self.problem, modes=modes_dict, schedule=rcpsp_schedule, employee_usage=usage, )
[docs] class CpPreemptiveMultiskillRcpspSolver(MinizincCpSolver): problem: MultiskillRcpspProblem def __init__( self, problem: MultiskillRcpspProblem, cp_solver_name: CpSolverName = CpSolverName.CHUFFED, params_objective_function: ParamsObjectiveFunction = None, silent_solve_error: bool = False, **kwargs, ): super().__init__( problem=problem, params_objective_function=params_objective_function ) self.silent_solve_error = silent_solve_error self.cp_solver_name = cp_solver_name self.key_decision_variable = [ "start", "mrun", ] # For now, I've put the var names of the CP model (not the rcpsp_problem) self.one_ressource_per_task = kwargs.get("one_ressource_per_task", False) self.resources_index = None self.unit_usage_preemptive = None
[docs] def constraint_task_to_mode(self, task_id, mode): modes = self.problem.mode_details[task_id].keys() list_strings = [] for m in modes: if m == mode: bool_str = "true" else: bool_str = "false" s = ( "constraint mrun[" + str(self.mode_dict_task_mode_to_index_minizinc[(task_id, m)]) + "]=" + bool_str + ";\n" ) list_strings += [s] return list_strings
[docs] def add_hard_special_constraints(self, partial_solution): return add_hard_special_constraints(partial_solution, self)
[docs] def constraint_duration_to_min_duration_preemptive(self, task, min_duration): list_strings = [] for i in range(2, self.nb_preemptive + 1): s = ( "constraint d_preemptive[" + str(self.index_in_minizinc[task]) + "," + str(i) + "]>" + str(min_duration) + "\/" + "d_preemptive[" + str(self.index_in_minizinc[task]) + "," + str(i) + "]=0;\n" ) list_strings += [s] return list_strings
[docs] def constraint_is_paused(self, task, is_paused): is_paused_str = "true" if is_paused else "false" return ( "constraint is_paused[" + str(self.index_in_minizinc[task]) + "]==" + is_paused_str + ";\n" )
[docs] def constraint_start_time_string( self, task, start_time, sign: SignEnum = SignEnum.EQUAL ) -> str: return ( "constraint s[" + str(self.index_in_minizinc[task]) + "]" + str(sign.value) + str(start_time) + ";\n" )
[docs] def constraint_start_time_string_preemptive_i( self, task, start_time, part_id=1, sign: SignEnum = SignEnum.EQUAL ) -> str: return ( "constraint s_preemptive[" + str(self.index_in_minizinc[task]) + "," + str(part_id) + "]" + str(sign.value) + str(start_time) + ";\n" )
[docs] def constraint_duration_string_preemptive_i( self, task, duration, part_id=1, sign: SignEnum = SignEnum.EQUAL ) -> str: return ( "constraint d_preemptive[" + str(self.index_in_minizinc[task]) + "," + str(part_id) + "]" + str(sign.value) + str(duration) + ";\n" )
[docs] def constraint_used_employee( self, task, employee, part_id=1, indicator: bool = False ): id_task = self.index_in_minizinc[task] id_employee = self.index_employees_in_minizinc[employee] tag = "true" if indicator else "false" if self.unit_usage_preemptive: return [ "constraint unit_used_preemptive[" + str(id_employee) + "," + str(id_task) + "," + str(part_id) + "]==" + str(tag) + ";\n" ] else: return [ "constraint unit_used[" + str(id_employee) + "," + str(id_task) + "]==" + str(tag) + ";\n" ]
[docs] def constraint_objective_max_time_set_of_jobs(self, set_of_jobs): s = [] for j in set_of_jobs: s += [ "constraint s_preemptive[" + str(self.index_in_minizinc[j]) + ", nb_preemptive]<=objective;\n" ] return s
[docs] def constraint_end_time_string( self, task, end_time, sign: SignEnum = SignEnum.EQUAL ) -> str: return ( "constraint s_preemptive[" + str(self.index_in_minizinc[task]) + ", nb_preemptive]" + str(sign.value) + str(end_time) + ";\n" )
[docs] def constraint_objective_equal_makespan(self, task_sink): ind = self.index_in_minizinc[task_sink] s = ( "constraint (s_preemptive[" + str(ind) + ", nb_preemptive]+d_preemptive[" + str(ind) + ",nb_preemptive]==objective);\n" ) return [s]
[docs] def constraint_sum_of_ending_time(self, set_subtasks: set[Hashable]): indexes = [self.index_in_minizinc[s] for s in set_subtasks] weights = [10 if s == self.problem.sink_task else 1 for s in set_subtasks] s = ( """int: nb_indexes=""" + str(len(indexes)) + """;\n array[1..nb_indexes] of int: weights=""" + str(weights) + """;\n array[1..nb_indexes] of Tasks: index_tasks=""" + str(indexes) + """;\n constraint objective>=sum(j in 1..nb_indexes)(weights[j]*(s_preemptive[index_tasks[j], nb_preemptive]+d_preemptive[index_tasks[j], nb_preemptive]));\n""" ) return [s]
[docs] def constraint_sum_of_starting_time(self, set_subtasks: set[Hashable]): indexes = [self.index_in_minizinc[s] for s in set_subtasks] weights = [10 if s == self.problem.sink_task else 1 for s in set_subtasks] s = ( """int: nb_indexes=""" + str(len(indexes)) + """;\n array[1..nb_indexes] of int: weights=""" + str(weights) + """;\n array[1..nb_indexes] of Tasks: index_tasks=""" + str(indexes) + """;\n constraint objective>=sum(j in 1..nb_indexes)(weights[j]*s_preemptive[index_tasks[j], 1]);\n""" ) return [s]
[docs] def init_model(self, **args): model_type = "ms_rcpsp_preemptive" model = Model(files_mzn[model_type]) exact_skills_need = args.get("exact_skills_need", False) strictly_disjunctive_subtasks = args.get("strictly_disjunctive_subtasks", True) fake_tasks = args.get( "fake_tasks", True ) # to modelize varying quantity of resource. solver = Solver.lookup(find_right_minizinc_solver_name(self.cp_solver_name)) resources_list = self.problem.resources_list self.resources_index = resources_list instance = Instance(solver, model) n_res = len(resources_list) keys = [] instance["nb_preemptive"] = args.get("nb_preemptive", 2) self.nb_preemptive = instance["nb_preemptive"] keys += ["nb_preemptive"] instance["possibly_preemptive"] = args.get( "possibly_preemptive", [True for task in self.problem.tasks_list] ) keys += ["possibly_preemptive"] instance["max_preempted"] = args.get( "max_preempted", min(self.problem.n_jobs_non_dummy, 5) ) keys += ["max_preempted"] instance["n_res"] = n_res keys += ["n_res"] instance["exact_skills_need"] = exact_skills_need keys += ["exact_skills_need"] instance["add_calendar_constraint_unit"] = args.get( "add_calendar_constraint_unit", True ) keys += ["add_calendar_constraint_unit"] instance["unit_usage_preemptive"] = args.get("unit_usage_preemptive", False) instance["strictly_disjunctive"] = strictly_disjunctive_subtasks self.unit_usage_preemptive = instance["unit_usage_preemptive"] keys += ["unit_usage_preemptive"] instance["one_ressource_per_task"] = self.one_ressource_per_task keys += ["one_ressource_per_task"] n_tasks = self.problem.n_jobs instance["n_tasks"] = n_tasks keys += ["n_tasks"] sorted_tasks = self.problem.tasks_list n_opt = sum( [len(list(self.problem.mode_details[key].keys())) for key in sorted_tasks] ) instance["n_opt"] = n_opt keys += ["n_opt"] all_modes = [ (act, mode, self.problem.mode_details[act][mode]) for act in sorted_tasks for mode in sorted(self.problem.mode_details[act]) ] self.modeindex_map = { i + 1: {"task": all_modes[i][0], "original_mode_index": all_modes[i][1]} for i in range(len(all_modes)) } modes = [set() for t in sorted_tasks] for j in self.modeindex_map: modes[self.problem.index_task[self.modeindex_map[j]["task"]]].add(j) self.mode_dict_task_mode_to_index_minizinc = {} for ind in self.modeindex_map: task = self.modeindex_map[ind]["task"] mode = self.modeindex_map[ind]["original_mode_index"] self.mode_dict_task_mode_to_index_minizinc[(task, mode)] = ind dur = [x[2]["duration"] for x in all_modes] instance["modes"] = modes keys += ["modes"] instance["dur"] = dur keys += ["dur"] skills_set = sorted(list(self.problem.skills_set)) nb_units = len(self.problem.employees) skill_required = [ [int(all_modes[i][2].get(s, 0)) for i in range(len(all_modes))] for s in skills_set ] if "max_time" in args: instance["max_time"] = args["max_time"] else: instance["max_time"] = self.problem.horizon keys += ["max_time"] dict_to_add = add_fake_task_cp_data( rcpsp_problem=self.problem, ignore_fake_task=not fake_tasks, max_time_to_consider=instance["max_time"], ) for key in dict_to_add: instance[key] = dict_to_add[key] keys += [key] instance["nb_skill"] = len(self.problem.skills_set) instance["skillreq"] = skill_required instance["nb_units"] = nb_units keys += ["nb_skill", "skillreq", "nb_units"] skillunits = [ [ int(math.floor(self.problem.employees[j].dict_skill[s].skill_value)) if s in self.problem.employees[j].dict_skill else 0 for s in skills_set ] for j in self.problem.employees_list ] self.employees_position = self.problem.employees_list self.index_employees_in_minizinc = { self.problem.employees_list[i]: i + 1 for i in range(len(self.problem.employees_list)) } instance["skillunits"] = skillunits keys += ["skillunits"] logger.debug(f"Employee position CP {self.employees_position}") rreq = [ [all_modes[i][2].get(res, 0) for i in range(len(all_modes))] for res in resources_list ] instance["rreq"] = rreq keys += ["rreq"] instance["rreq"] = rreq keys += ["rreq"] rcap = [ int(max(self.problem.resources_availability[x])) for x in resources_list ] instance["rc"] = rcap keys += ["rc"] rtype = [ 2 if res in self.problem.non_renewable_resources else 1 for res in resources_list ] instance["rtype"] = rtype keys += ["rtype"] succ = [ set( [ self.problem.return_index_task(x, offset=1) for x in self.problem.successors.get(task, []) ] ) for task in sorted_tasks ] instance["suc"] = succ keys += ["suc"] instance["add_objective_makespan"] = args.get("add_objective_makespan", True) instance["ignore_sec_objective"] = args.get("ignore_sec_objective", True) self.instance = instance p_s: Optional[PartialSolution] = args.get("partial_solution", None) self.index_in_minizinc = { task: self.problem.return_index_task(task, offset=1) for task in self.problem.tasks_list } add_partial_solution_hard_constraint = args.get( "add_partial_solution_hard_constraint", True ) sec_objective_equal_penalty = args.get("sec_objective_equal_penalty", True) self.second_objectives = {"weights": [], "name_penalty": []} if add_partial_solution_hard_constraint: strings = add_hard_special_constraints(p_s, self) for s in strings: self.instance.add_string(s) else: strings, name_penalty = add_soft_special_constraints(p_s, self) for s in strings: self.instance.add_string(s) self.second_objectives = { "weights": [100] * len(name_penalty), "name_penalty": name_penalty, } if len(name_penalty) > 0: strings = define_second_part_objective( [100] * len(name_penalty), name_penalty, equal=sec_objective_equal_penalty, ) for s in strings: self.instance.add_string(s)
[docs] def retrieve_solution( self, _output_item: Optional[str] = None, **kwargs: Any ) -> PreemptiveMultiskillRcpspSolution: """Return a d-o solution from the variables computed by minizinc. Args: _output_item: string representing the minizinc solver output passed by minizinc to the solution constructor **kwargs: keyword arguments passed by minzinc to the solution contructor containing the objective value (key "objective"), and the computed variables as defined in minizinc model. Returns: """ _log_minzinc_result(_output_item=_output_item, **kwargs) starts_preemptive = kwargs["s_preemptive"] duration_preemptive = kwargs["d_preemptive"] mruns = kwargs["mrun"] units_used = kwargs["unit_used"] units_used_preemptive = kwargs["unit_used_preemptive"] rcpsp_schedule = {} modes_dict = {} for k in range(len(starts_preemptive)): starts_k = [] ends_k = [] for j in range(len(starts_preemptive[k])): if j == 0 or duration_preemptive[k][j] != 0: starts_k += [starts_preemptive[k][j]] ends_k += [starts_k[-1] + duration_preemptive[k][j]] rcpsp_schedule[self.problem.tasks_list[k]] = { "starts": starts_k, "ends": ends_k, } for m in range(len(mruns)): if mruns[m]: modes_dict[self.modeindex_map[m + 1]["task"]] = self.modeindex_map[ m + 1 ]["original_mode_index"] if not self.unit_usage_preemptive: unit_used = units_used usage = {} for w in range(len(unit_used)): for task in range(len(unit_used[w])): if unit_used[w][task] == 1: task_id = self.problem.tasks_list[task] mode = modes_dict[task_id] skills_needed = set( [ s for s in self.problem.skills_set if s in self.problem.mode_details[task_id][mode] and self.problem.mode_details[task_id][mode][s] > 0 ] ) skills_worker = set( [ s for s in self.problem.employees[ self.employees_position[w] ].dict_skill if self.problem.employees[self.employees_position[w]] .dict_skill[s] .skill_value > 0 ] ) intersection = skills_needed.intersection(skills_worker) if len(intersection) > 0: if task_id not in usage: usage[task_id] = {} usage[task_id][self.employees_position[w]] = intersection for task_id in usage: usage[task_id] = [ usage[task_id] for i in range(len(rcpsp_schedule[task_id]["starts"])) ] else: unit_used = units_used_preemptive usage = {} for w in range(len(unit_used)): for task in range(len(unit_used[w])): task_id = self.problem.tasks_list[task] mode = modes_dict[task_id] skills_needed = set( [ s for s in self.problem.skills_set if s in self.problem.mode_details[task_id][mode] and self.problem.mode_details[task_id][mode][s] > 0 ] ) if task_id not in usage: usage[task_id] = [ {} for j in range(len(rcpsp_schedule[task_id]["starts"])) ] for j in range(len(rcpsp_schedule[task_id]["starts"])): if unit_used[w][task][j] == 1: skills_worker = set( [ s for s in self.problem.employees[ self.employees_position[w] ].dict_skill if self.problem.employees[ self.employees_position[w] ] .dict_skill[s] .skill_value > 0 ] ) intersection = skills_needed.intersection(skills_worker) if len(intersection) > 0: usage[task_id][j][ self.employees_position[w] ] = intersection return PreemptiveMultiskillRcpspSolution( problem=self.problem, modes=modes_dict, schedule=rcpsp_schedule, employee_usage=usage, )
[docs] class CpPartialPreemptiveMultiskillRcpspSolver(CpPreemptiveMultiskillRcpspSolver):
[docs] def init_model(self, **args): model_type = args.get("model_type", "ms_rcpsp_partial_preemptive") model = Model(files_mzn[model_type]) exact_skills_need = args.get("exact_skills_need", False) fake_tasks = args.get( "fake_tasks", True ) # to modelize varying quantity of resource. include_constraint_on_start_value = args.get( "include_constraint_on_start_value", False ) strictly_disjunctive_subtasks = args.get("strictly_disjunctive_subtasks", True) solver = Solver.lookup(find_right_minizinc_solver_name(self.cp_solver_name)) resources_list = self.problem.resources_list self.resources_index = resources_list instance = Instance(solver, model) n_res = len(resources_list) keys = [] instance["nb_preemptive"] = args.get("nb_preemptive", 2) instance[ "include_constraint_on_start_value" ] = include_constraint_on_start_value self.nb_preemptive = instance["nb_preemptive"] keys += ["nb_preemptive"] instance["possibly_preemptive"] = args.get( "possibly_preemptive", [ self.problem.preemptive_indicator.get(t, True) for t in self.problem.tasks_list ], ) instance["strictly_disjunctive"] = strictly_disjunctive_subtasks keys += ["possibly_preemptive"] instance["max_preempted"] = args.get( "max_preempted", min(self.problem.n_jobs_non_dummy, 5) ) keys += ["max_preempted"] instance["n_res"] = n_res keys += ["n_res"] instance["exact_skills_need"] = exact_skills_need keys += ["exact_skills_need"] instance["add_calendar_constraint_unit"] = args.get( "add_calendar_constraint_unit", True ) keys += ["add_calendar_constraint_unit"] instance["unit_usage_preemptive"] = args.get("unit_usage_preemptive", False) self.unit_usage_preemptive = instance["unit_usage_preemptive"] keys += ["unit_usage_preemptive"] instance["one_ressource_per_task"] = self.one_ressource_per_task keys += ["one_ressource_per_task"] n_tasks = self.problem.n_jobs instance["n_tasks"] = n_tasks keys += ["n_tasks"] sorted_tasks = self.problem.tasks_list n_opt = sum( [len(list(self.problem.mode_details[key].keys())) for key in sorted_tasks] ) instance["n_opt"] = n_opt keys += ["n_opt"] all_modes = [ ( act, mode, self.problem.mode_details[act][mode], self.problem.partial_preemption_data[act][mode], ) for act in sorted_tasks for mode in sorted(self.problem.mode_details[act]) ] self.modeindex_map = { i + 1: {"task": all_modes[i][0], "original_mode_index": all_modes[i][1]} for i in range(len(all_modes)) } modes = [set() for t in sorted_tasks] for j in self.modeindex_map: modes[self.problem.index_task[self.modeindex_map[j]["task"]]].add(j) self.mode_dict_task_mode_to_index_minizinc = {} for ind in self.modeindex_map: task = self.modeindex_map[ind]["task"] mode = self.modeindex_map[ind]["original_mode_index"] self.mode_dict_task_mode_to_index_minizinc[(task, mode)] = ind dur = [x[2]["duration"] for x in all_modes] instance["modes"] = modes keys += ["modes"] instance["dur"] = dur keys += ["dur"] skills_set = sorted(list(self.problem.skills_set)) nb_units = len(self.problem.employees) skill_required = [ [int(all_modes[i][2].get(s, 0)) for i in range(len(all_modes))] for s in skills_set ] if "max_time" in args: instance["max_time"] = args["max_time"] else: instance["max_time"] = self.problem.horizon keys += ["max_time"] dict_to_add = add_fake_task_cp_data( rcpsp_problem=self.problem, ignore_fake_task=not fake_tasks, max_time_to_consider=instance["max_time"], ) for key in dict_to_add: instance[key] = dict_to_add[key] keys += [key] instance["nb_skill"] = len(self.problem.skills_set) instance["skillreq"] = skill_required instance["nb_units"] = nb_units keys += ["nb_skill", "skillreq", "nb_units"] skillunits = [ [ int(math.floor(self.problem.employees[j].dict_skill[s].skill_value)) if s in self.problem.employees[j].dict_skill else 0 for s in skills_set ] for j in self.problem.employees_list ] self.employees_position = self.problem.employees_list self.index_employees_in_minizinc = { self.problem.employees_list[i]: i + 1 for i in range(len(self.problem.employees_list)) } instance["skillunits"] = skillunits keys += ["skillunits"] logger.debug(f"Employee position CP {self.employees_position}") rreq = [ [all_modes[i][2].get(res, 0) for i in range(len(all_modes))] for res in resources_list ] instance["rreq"] = rreq keys += ["rreq"] # New input data for the partially preemptive use case. is_releasable = [ [1 if all_modes[i][3].get(res, True) else 0 for i in range(len(all_modes))] for res in resources_list ] instance["is_releasable"] = is_releasable keys += ["is_releasable"] rcap = [ int(max(self.problem.resources_availability[x])) for x in resources_list ] instance["rc"] = rcap keys += ["rc"] rtype = [ 2 if res in self.problem.non_renewable_resources else 1 for res in resources_list ] instance["rtype"] = rtype keys += ["rtype"] succ = [ set( [ self.problem.return_index_task(x, offset=1) for x in self.problem.successors.get(task, []) ] ) for task in sorted_tasks ] instance["suc"] = succ keys += ["suc"] instance["add_objective_makespan"] = args.get("add_objective_makespan", True) instance["ignore_sec_objective"] = args.get("ignore_sec_objective", True) self.instance = instance p_s: Optional[PartialSolution] = args.get("partial_solution", None) self.index_in_minizinc = { task: self.problem.return_index_task(task, offset=1) for task in self.problem.tasks_list } add_partial_solution_hard_constraint = args.get( "add_partial_solution_hard_constraint", True ) self.second_objectives = {"weights": [], "name_penalty": []} if add_partial_solution_hard_constraint: strings = add_hard_special_constraints(p_s, self) for s in strings: self.instance.add_string(s) else: strings, name_penalty = add_soft_special_constraints(p_s, self) for s in strings: self.instance.add_string(s) self.second_objectives = { "weights": [100] * len(name_penalty), "name_penalty": name_penalty, } if len(name_penalty) > 0: strings = define_second_part_objective( [100] * len(name_penalty), name_penalty, equal=False ) for s in strings: self.instance.add_string(s)
[docs] def stick_to_solution(solution: RcpspSolution, cp_solver: CpMultiskillRcpspSolver): list_strings = [] for task in solution.rcpsp_schedule: start = solution.rcpsp_schedule[task]["start_time"] list_strings += [ cp_solver.constraint_start_time_string( task=task, start_time=start, sign=SignEnum.EQUAL ) ] return list_strings
[docs] def stick_to_solution_preemptive( solution: PreemptiveRcpspSolution, cp_solver: CpPreemptiveMultiskillRcpspSolver ): list_strings = [] for task in solution.rcpsp_schedule: starts = solution.rcpsp_schedule[task]["starts"] ends = solution.rcpsp_schedule[task]["ends"] is_paused = len(starts) > 1 list_strings += [cp_solver.constraint_is_paused(task=task, is_paused=is_paused)] for i in range(len(starts)): list_strings += [ cp_solver.constraint_start_time_string_preemptive_i( task, start_time=starts[i], part_id=i + 1, sign=SignEnum.EQUAL ) ] list_strings += [ cp_solver.constraint_duration_string_preemptive_i( task, duration=ends[i] - starts[i], part_id=i + 1, sign=SignEnum.EQUAL, ) ] for k in range(len(starts), cp_solver.nb_preemptive): list_strings += [ cp_solver.constraint_start_time_string_preemptive_i( task, start_time=ends[-1], part_id=k + 1, sign=SignEnum.EQUAL ) ] list_strings += [ cp_solver.constraint_duration_string_preemptive_i( task, duration=0, part_id=k + 1, sign=SignEnum.EQUAL ) ] return list_strings
[docs] def hard_start_times( dict_start_times: dict[Hashable, int], cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): constraint_strings = [] for task in dict_start_times: string = ( "constraint s[" + str(cp_solver.index_in_minizinc[task]) + "] == " + str(dict_start_times[task]) + ";\n" ) constraint_strings += [string] return constraint_strings
[docs] def soft_start_times( dict_start_times: dict[Hashable, int], cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): list_task = list(dict_start_times.keys()) s = ( """ var 0..max_time*nb_start_times: penalty_start_times;\n int: nb_start_times=""" + str(len(list_task)) + """;\n""" + """ array[1..nb_start_times] of Tasks: st1_0=""" + str([cp_solver.index_in_minizinc[t1] for t1 in list_task]) + """;\n""" + """ array[1..nb_start_times] of 0..max_time: array_start_0=""" + str([dict_start_times[t1] for t1 in list_task]) + """;\n""" + """ constraint sum(i in 1..nb_start_times)(abs(s[st1_0[i]]-array_start_0[i]))==penalty_start_times;\n""" ) return [s], ["penalty_start_times"]
[docs] def soft_start_together( list_start_together: list[tuple[Hashable, Hashable]], cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): s = ( """ var 0..max_time*nb_start_together: penalty_start_together;\n int: nb_start_together=""" + str(len(list_start_together)) + """;\n""" + """ array[1..nb_start_together] of Tasks: st1_3=""" + str([cp_solver.index_in_minizinc[t1] for t1, t2 in list_start_together]) + """;\n""" + """ array[1..nb_start_together] of Tasks: st2_3=""" + str([cp_solver.index_in_minizinc[t2] for t1, t2 in list_start_together]) + """;\n""" + """ constraint sum(i in 1..nb_start_together)(abs(s[st1_3[i]]-s[st2_3[i]]))==penalty_start_together;\n """ ) return [s], ["penalty_start_together"]
[docs] def hard_start_together( list_start_together: list[tuple[Hashable, Hashable]], cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): constraint_strings = [] for t1, t2 in list_start_together: string = ( "constraint s[" + str(cp_solver.index_in_minizinc[t1]) + "] == s[" + str(cp_solver.index_in_minizinc[t2]) + "];\n" ) constraint_strings += [string] return constraint_strings
[docs] def hard_start_after_nunit( list_start_after_nunit: list[tuple[Hashable, Hashable, int]], cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): constraint_strings = [] for t1, t2, delta in list_start_after_nunit: string = ( "constraint s[" + str(cp_solver.index_in_minizinc[t2]) + "] >= s[" + str(cp_solver.index_in_minizinc[t1]) + "]+" + str(delta) + ";\n" ) constraint_strings += [string] return constraint_strings
[docs] def soft_start_after_nunit( list_start_after_nunit: list[tuple[Hashable, Hashable, int]], cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): s = ( """ var 0..max_time*nb_start_after_nunit: penalty_start_after_nunit;\n int: nb_start_after_nunit=""" + str(len(list_start_after_nunit)) + """;\n""" + """ array[1..nb_start_after_nunit] of Tasks: st1_4=""" + str( [ cp_solver.index_in_minizinc[t1] for t1, t2, delta in list_start_after_nunit ] ) + """;\n""" + """ array[1..nb_start_after_nunit] of Tasks: st2_4=""" + str( [ cp_solver.index_in_minizinc[t2] for t1, t2, delta in list_start_after_nunit ] ) + """;\n""" + """ array[1..nb_start_after_nunit] of int: nunits_4=""" + str([delta for t1, t2, delta in list_start_after_nunit]) + """;\n""" + """ constraint sum(i in 1..nb_start_after_nunit)(max([0, s[st1_4[i]]+nunits_4[i]-s[st2_4[i]]]))==penalty_start_after_nunit;\n """ ) return [s], ["penalty_start_after_nunit"]
[docs] def hard_start_at_end_plus_offset( list_start_at_end_plus_offset, cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): constraint_strings = [] for t1, t2, delta in list_start_at_end_plus_offset: if isinstance(cp_solver, CpMultiskillRcpspSolver): string = ( "constraint s[" + str(cp_solver.index_in_minizinc[t2]) + "] >= s[" + str(cp_solver.index_in_minizinc[t1]) + "]+adur[" + str(cp_solver.index_in_minizinc[t1]) + "]+" + str(delta) + ";\n" ) else: string = ( "constraint s[" + str(cp_solver.index_in_minizinc[t2]) + "] >= s_preemptive[" + str(cp_solver.index_in_minizinc[t1]) + ", nb_preemptive]+" + str(delta) + ";\n" ) constraint_strings += [string] return constraint_strings
[docs] def soft_start_at_end_plus_offset( list_start_at_end_plus_offset: list[tuple[Hashable, Hashable, int]], cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): s = ( """ var 0..max_time*nb_start_at_end_plus_offset: penalty_start_at_end_plus_offset;\n int: nb_start_at_end_plus_offset=""" + str(len(list_start_at_end_plus_offset)) + """;\n""" + """ array[1..nb_start_at_end_plus_offset] of Tasks: st1_7=""" + str( [ cp_solver.index_in_minizinc[t1] for t1, t2, delta in list_start_at_end_plus_offset ] ) + """;\n""" + """ array[1..nb_start_at_end_plus_offset] of Tasks: st2_7=""" + str( [ cp_solver.index_in_minizinc[t2] for t1, t2, delta in list_start_at_end_plus_offset ] ) + """;\n""" + """ array[1..nb_start_at_end_plus_offset] of int: nunits=""" + str([delta for t1, t2, delta in list_start_at_end_plus_offset]) + """;\n""" ) if isinstance(cp_solver, CpMultiskillRcpspSolver): s += """ constraint sum(i in 1..nb_start_at_end_plus_offset)(max([0, s[st1_7[i]]+adur[st1_7[i]]+nunits[i]-s[st2_7[i]]]))==penalty_start_at_end_plus_offset;\n """ else: s += """ constraint sum(i in 1..nb_start_at_end_plus_offset)(max([0, s_preemptive[st1_7[i], nb_preemptive]+nunits[i]-s[st2_7[i]]]))==penalty_start_at_end_plus_offset;\n """ return [s], ["penalty_start_at_end_plus_offset"]
[docs] def hard_start_at_end( list_start_at_end: list[tuple[Hashable, Hashable]], cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): constraint_strings = [] for t1, t2 in list_start_at_end: if isinstance(cp_solver, CpMultiskillRcpspSolver): string = ( "constraint s[" + str(cp_solver.index_in_minizinc[t2]) + "] == s[" + str(cp_solver.index_in_minizinc[t1]) + "]+adur[" + str(cp_solver.index_in_minizinc[t1]) + "];\n" ) else: string = ( "constraint s_preemptive[" + str(cp_solver.index_in_minizinc[t2]) + ", 1] == s_preemptive[" + str(cp_solver.index_in_minizinc[t1]) + ", nb_preemptive];\n" ) constraint_strings += [string] return constraint_strings
[docs] def soft_start_at_end( list_start_at_end: list[tuple[Hashable, Hashable]], cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): s = ( """ var 0..max_time*nb_start_at_end: penalty_start_at_end;\n int: nb_start_at_end=""" + str(len(list_start_at_end)) + """;\n""" + """ array[1..nb_start_at_end] of Tasks: st1_9=""" + str([cp_solver.index_in_minizinc[t1] for t1, t2 in list_start_at_end]) + """;\n""" + """ array[1..nb_start_at_end] of Tasks: st2_9=""" + str([cp_solver.index_in_minizinc[t2] for t1, t2 in list_start_at_end]) + """;\n""" ) if isinstance(cp_solver, CpPreemptiveMultiskillRcpspSolver): s += """ %constraint forall(i in 1..nb_start_at_end)(s[st2_9[i]]-s_preemptive[st1_9[i], nb_preemptive]>=0);\n constraint sum(i in 1..nb_start_at_end)(abs(s[st2_9[i]]-s_preemptive[st1_9[i], nb_preemptive]))==penalty_start_at_end;\n """ else: s += """ %constraint forall(i in 1..nb_start_at_end)(s[st2_9[i]]-s[st1_9[i]]-adur[st1_9[i]]>=0);\n constraint sum(i in 1..nb_start_at_end)(abs(s[st2_9[i]]-s[st1_9[i]]-adur[st1_9[i]]))==penalty_start_at_end;\n """ return [s], ["penalty_start_at_end"]
[docs] def soft_start_window( start_times_window: dict[Hashable, tuple[int, int]], cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): l_low = [ (t, start_times_window[t][0]) for t in start_times_window if start_times_window[t][0] is not None ] l_up = [ (t, start_times_window[t][1]) for t in start_times_window if start_times_window[t][1] is not None ] max_t_start = max( [max([x[1] for x in l_low], default=0), max([x[1] for x in l_up], default=0)] ) s = ( """ var 0..max_time*nb_start_window_low: penalty_start_low;\n int: nb_start_window_low=""" + str(len(l_low)) + """;\n var 0..max_time*nb_start_window_up: penalty_start_up;\n int: nb_start_window_up=""" + str(len(l_up)) + """;\n array[1..nb_start_window_low] of Tasks: task_id_low_start=""" + str([cp_solver.index_in_minizinc[x[0]] for x in l_low]) + """;\n array[1..nb_start_window_up] of Tasks: task_id_up_start=""" + str([cp_solver.index_in_minizinc[x[0]] for x in l_up]) + """;\n int: max_time_start=""" + str(max_t_start) + """;\n array[1..nb_start_window_low] of 0..max_time_start: times_low_start=""" + str([int(x[1]) for x in l_low]) + """;\n array[1..nb_start_window_up] of 0..max_time_start: times_up_start=""" + str([int(x[1]) for x in l_up]) + """;\n""" ) s += """ constraint sum(i in 1..nb_start_window_low)(max([times_low_start[i]-s[task_id_low_start[i]], 0]))==penalty_start_low;\n constraint sum(i in 1..nb_start_window_up)(max([-times_up_start[i]+s[task_id_up_start[i]], 0]))==penalty_start_up;\n """ return [s], ["penalty_start_low", "penalty_start_up"]
[docs] def soft_end_window( end_times_window: dict[Hashable, tuple[int, int]], cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): l_low = [ (t, end_times_window[t][0]) for t in end_times_window if end_times_window[t][0] is not None ] l_up = [ (t, end_times_window[t][1]) for t in end_times_window if end_times_window[t][1] is not None ] max_t_end = max( [max([x[1] for x in l_low], default=0), max([x[1] for x in l_up], default=0)] ) s = ( """ var 0..max_time*nb_end_window_low: penalty_end_low;\n int: nb_end_window_low=""" + str(len(l_low)) + """;\n var 0..max_time*nb_end_window_up: penalty_end_up;\n int: nb_end_window_up=""" + str(len(l_up)) + """;\n array[1..nb_end_window_low] of Tasks: task_id_low_end=""" + str([cp_solver.index_in_minizinc[x[0]] for x in l_low]) + """;\n array[1..nb_end_window_up] of Tasks: task_id_up_end=""" + str([cp_solver.index_in_minizinc[x[0]] for x in l_up]) + """;\n int: max_time_end=""" + str(max_t_end) + """;\n array[1..nb_end_window_low] of 0..max_time_end: times_low_end=""" + str([int(x[1]) for x in l_low]) + """;\n array[1..nb_end_window_up] of 0..max_time_end: times_up_end=""" + str([int(x[1]) for x in l_up]) + """;\n""" ) if isinstance(cp_solver, CpPreemptiveMultiskillRcpspSolver): s += """ constraint sum(i in 1..nb_end_window_low)(max([times_low_end[i]-s_preemptive[task_id_low_end[i], nb_preemptive], 0]))==penalty_end_low;\n constraint sum(i in 1..nb_end_window_up)(max([-times_up_end[i]+s_preemptive[task_id_up_end[i], nb_preemptive], 0]))==penalty_end_up;\n """ else: s += """ constraint sum(i in 1..nb_end_window_low)(max([times_low_end[i]-s[task_id_low_end[i]]+adur[task_id_low_end[i]], 0]))==penalty_end_low;\n constraint sum(i in 1..nb_end_window_up)(max([-times_up_end[i]+s[task_id_up_end[i]]+adur[task_id_up_end[i]], 0]))==penalty_end_up;\n """ return [s], ["penalty_end_low", "penalty_end_up"]
[docs] def hard_start_window( start_times_window: dict[Hashable, tuple[int, int]], cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): l = [] for task in start_times_window: if start_times_window[task][0] is not None: l += [ cp_solver.constraint_start_time_string( task=task, start_time=start_times_window[task][0], sign=SignEnum.UEQ ) ] if start_times_window[task][1] is not None: l += [ cp_solver.constraint_start_time_string( task=task, start_time=start_times_window[task][1], sign=SignEnum.LEQ ) ] return l
[docs] def hard_end_window( end_times_window: dict[Hashable, tuple[int, int]], cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): l = [] for task in end_times_window: if end_times_window[task][0] is not None: l += [ cp_solver.constraint_end_time_string( task=task, end_time=end_times_window[task][0], sign=SignEnum.UEQ ) ] if end_times_window[task][1] is not None: l += [ cp_solver.constraint_end_time_string( task=task, end_time=end_times_window[task][1], sign=SignEnum.LEQ ) ] return l
[docs] def add_hard_special_constraints( partial_solution: PartialSolution, cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): if partial_solution is None: return [] constraint_strings = [] if partial_solution.start_times is not None: constraint_strings += hard_start_times( dict_start_times=partial_solution.start_times, cp_solver=cp_solver ) if partial_solution.partial_permutation is not None: for t1, t2 in zip( partial_solution.partial_permutation[:-1], partial_solution.partial_permutation[1:], ): string = ( "constraint s[" + str(cp_solver.index_in_minizinc[t1]) + "] " "<= s[" + str(cp_solver.index_in_minizinc[t2]) + "];\n" ) constraint_strings += [string] if partial_solution.list_partial_order is not None: for l in partial_solution.list_partial_order: for t1, t2 in zip(l[:-1], l[1:]): string = ( "constraint s[" + str(cp_solver.index_in_minizinc[t1]) + "] " "<= s[" + str(cp_solver.index_in_minizinc[t2]) + "];\n" ) constraint_strings += [string] if partial_solution.start_together is not None: constraint_strings += hard_start_together( list_start_together=partial_solution.start_together, cp_solver=cp_solver ) if partial_solution.start_after_nunit is not None: constraint_strings += hard_start_after_nunit( list_start_after_nunit=partial_solution.start_after_nunit, cp_solver=cp_solver, ) if partial_solution.start_at_end_plus_offset is not None: constraint_strings += hard_start_at_end_plus_offset( list_start_at_end_plus_offset=partial_solution.start_at_end_plus_offset, cp_solver=cp_solver, ) if partial_solution.start_at_end is not None: constraint_strings += hard_start_at_end( list_start_at_end=partial_solution.start_at_end, cp_solver=cp_solver ) if partial_solution.start_times_window is not None: constraint_strings += hard_start_window( start_times_window=partial_solution.start_times_window, cp_solver=cp_solver ) if partial_solution.end_times_window is not None: constraint_strings += hard_end_window( end_times_window=partial_solution.end_times_window, cp_solver=cp_solver ) return constraint_strings
[docs] def add_soft_special_constraints( partial_solution: PartialSolution, cp_solver: Union[CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver], ): if partial_solution is None: return [], [] constraint_strings = [] name_penalty = [] if partial_solution.start_times is not None: c, n = soft_start_times( dict_start_times=partial_solution.start_times, cp_solver=cp_solver ) constraint_strings += c name_penalty += n if partial_solution.partial_permutation is not None: for t1, t2 in zip( partial_solution.partial_permutation[:-1], partial_solution.partial_permutation[1:], ): string = ( "constraint s[" + str(cp_solver.index_in_minizinc[t1]) + "] " + "<= s[" + str(cp_solver.index_in_minizinc[t2]) + "];\n" ) constraint_strings += [string] if partial_solution.list_partial_order is not None: for l in partial_solution.list_partial_order: for t1, t2 in zip(l[:-1], l[1:]): string = ( "constraint s[" + str(cp_solver.index_in_minizinc[t1]) + "] " "<= s[" + str(cp_solver.index_in_minizinc[t2]) + "];\n" ) constraint_strings += [string] if partial_solution.start_together is not None: c, n = soft_start_together( list_start_together=partial_solution.start_together, cp_solver=cp_solver ) constraint_strings += c name_penalty += n if partial_solution.start_after_nunit is not None: c, n = soft_start_after_nunit( list_start_after_nunit=partial_solution.start_after_nunit, cp_solver=cp_solver, ) constraint_strings += c name_penalty += n if partial_solution.start_at_end_plus_offset is not None: c, n = soft_start_at_end_plus_offset( list_start_at_end_plus_offset=partial_solution.start_at_end_plus_offset, cp_solver=cp_solver, ) constraint_strings += c name_penalty += n if partial_solution.start_at_end is not None: c, n = soft_start_at_end( list_start_at_end=partial_solution.start_at_end, cp_solver=cp_solver ) constraint_strings += c name_penalty += n if partial_solution.start_times_window is not None: c, n = soft_start_window( partial_solution.start_times_window, cp_solver=cp_solver ) constraint_strings += c constraint_strings += ["constraint " + str(n[0]) + "==0;\n"] name_penalty += n if partial_solution.end_times_window is not None: c, n = soft_end_window(partial_solution.end_times_window, cp_solver=cp_solver) constraint_strings += c name_penalty += n return constraint_strings, name_penalty
[docs] def define_second_part_objective(weights, name_penalty, equal=False): sum_string = "+".join( ["0"] + [str(weights[i]) + "*" + name_penalty[i] for i in range(len(weights))] ) if equal: s = "constraint sec_objective==" + sum_string + ";\n" else: s = "constraint sec_objective>=" + sum_string + ";\n" return [s]
[docs] def add_constraints_string(child_instance, list_of_strings): for s in list_of_strings: child_instance.add_string(s)
[docs] class SolutionPrecomputeEmployeesForTasks: def __init__( self, unit_used, worker_type_used, mode_dict, overskill_unit, overskill_type ): self.unit_used = unit_used self.worker_type_used = worker_type_used self.mode_dict = mode_dict self.overskill_unit = overskill_unit self.overskill_type = overskill_type def __str__(self): return "-".join( [str(s) + " : " + str(getattr(self, s)) for s in self.__dict__.keys()] )
[docs] class PrecomputeEmployeesForTasks: def __init__( self, ms_rcpsp_problem: MultiskillRcpspProblem, cp_solver_name: CpSolverName = CpSolverName.CHUFFED, ): self.ms_rcpsp_problem = ms_rcpsp_problem self.cp_solver_name = cp_solver_name self.instance: Instance = None ( self.skills_representation_str, self.skills_dict, ) = cluster_employees_to_resource_types(self.ms_rcpsp_problem)
[docs] def init_model(self, **kwargs): solver = Solver.lookup(find_right_minizinc_solver_name(self.cp_solver_name)) model = Model(files_mzn["compute_worker_for_tasks"]) instance = Instance(solver, model) instance["one_ressource_per_task"] = kwargs.get("one_ressource_per_task", False) instance["one_worker_type_per_task"] = kwargs.get( "one_worker_type_per_task", False ) instance["exact_skills_need"] = kwargs.get("exact_skills_need", False) instance["consider_worker_type"] = kwargs.get("consider_worker_type", True) instance["consider_units"] = kwargs.get("consider_units", False) tasks_of_interest = kwargs.get( "tasks_of_interest", [self.ms_rcpsp_problem.tasks_list[0]] ) instance["n_tasks"] = len(tasks_of_interest) n_opt = sum( [ len(list(self.ms_rcpsp_problem.mode_details[key].keys())) for key in tasks_of_interest ] ) instance["n_opt"] = n_opt all_modes = [ (act, mode, self.ms_rcpsp_problem.mode_details[act][mode]) for act in tasks_of_interest for mode in sorted(self.ms_rcpsp_problem.mode_details[act]) ] self.modeindex_map = { i + 1: {"task": all_modes[i][0], "original_mode_index": all_modes[i][1]} for i in range(len(all_modes)) } modes = [set() for t in tasks_of_interest] for j in self.modeindex_map: modes[tasks_of_interest.index(self.modeindex_map[j]["task"])].add(j) dur = [x[2]["duration"] for x in all_modes] instance["modes"] = modes instance["dur"] = dur rreq = [ [all_modes[i][2].get(res, 0) for i in range(len(all_modes))] for res in self.ms_rcpsp_problem.resources_list ] instance["rreq"] = rreq instance["nb_res"] = len(self.ms_rcpsp_problem.resources_list) rcap = [ int(max(self.ms_rcpsp_problem.resources_availability[x])) for x in self.ms_rcpsp_problem.resources_list ] instance["rcap"] = rcap rtype = [ 2 if res in self.ms_rcpsp_problem.non_renewable_resources else 1 for res in self.ms_rcpsp_problem.resources_list ] instance["rtype"] = rtype skills_set = self.ms_rcpsp_problem.skills_list skill_required = [ [int(all_modes[i][2].get(s, 0)) for i in range(len(all_modes))] for s in skills_set ] instance["nb_skill"] = len(self.ms_rcpsp_problem.skills_set) instance["skillreq"] = skill_required instance["nb_units"] = len(self.ms_rcpsp_problem.employees_list) skillunits = [ [ int( math.floor( self.ms_rcpsp_problem.employees[j].dict_skill[s].skill_value ) ) if s in self.ms_rcpsp_problem.employees[j].dict_skill else 0 for s in skills_set ] for j in self.ms_rcpsp_problem.employees_list ] self.employees_position = self.ms_rcpsp_problem.employees_list instance["skillunits"] = skillunits worker_type_list = sorted(self.skills_dict) instance["nb_worker_type"] = len(worker_type_list) instance["skills_worker_type"] = [ [ self.skills_dict[wt][s].skill_value if s in self.skills_dict[wt] else 0 for s in skills_set ] for wt in worker_type_list ] instance["capacity_worker_type"] = [ len(self.skills_representation_str[wt]) for wt in worker_type_list ] self.instance = instance
[docs] def retrieve_solutions(self, result, parameters_cp: ParametersCp): intermediate_solutions = parameters_cp.intermediate_solution units_used = [] workers_type_used = [] overskills_units = [] overskills_types = [] mruns = [] modes_dict = [] if intermediate_solutions: for i in range(len(result)): mruns += [result[i, "mrun"]] overskills_types += [result[i, "overskill_type"]] overskills_units += [result[i, "overskill_unit"]] units_used += [result[i, "unit_used"]] workers_type_used += [result[i, "worker_type_used"]] else: mruns += [result["mrun"]] overskills_types += [result["overskill_type"]] overskills_units += [result["overskill_unit"]] units_used += [result["unit_used"]] workers_type_used += [result["worker_type_used"]] for k in range(len(mruns)): mode_dict = {} for i in range(len(mruns[k])): if mruns[k][i]: mode_dict[self.modeindex_map[i + 1]["task"]] = self.modeindex_map[ i + 1 ]["original_mode_index"] modes_dict += [mode_dict] return [ SolutionPrecomputeEmployeesForTasks( unit_used=u, worker_type_used=w, mode_dict=m, overskill_unit=ou, overskill_type=ow, ) for u, w, m, ou, ow in zip( units_used, workers_type_used, modes_dict, overskills_units, overskills_types, ) ]
[docs] def solve( self, parameters_cp: Optional[ParametersCp] = None, time_limit: Optional[float] = 100.0, nr_solutions: int = 100, all_solutions: bool = False, **args, ): """Solve the CP problem with minizinc Args: parameters_cp: parameters specific to CP solvers time_limit: the solve process stops after this time limit (in seconds). If None, no time limit is applied. nr_solutions: of not `all_solutions`, the solve stops after finding `nr_solutions` all_solutions: if True, do not stop when reaching `nr_solutions` **args: passed to init_model() Returns: """ if self.instance is None: self.init_model(**args) if parameters_cp is None: parameters_cp = ParametersCp.default() intermediate_solutions = parameters_cp.intermediate_solution if time_limit is None: timeout = None else: timeout = timedelta(seconds=time_limit) result = self.instance.solve( timeout=timeout, intermediate_solutions=intermediate_solutions, nr_solutions=nr_solutions, all_solutions=all_solutions, ) logger.debug(result.status) return self.retrieve_solutions(result=result, parameters_cp=parameters_cp)