Source code for discrete_optimization.generic_rcpsp_tools.solvers.lns_cp.solution_repair

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
import random
from collections.abc import Hashable, Iterable
from typing import Any, Optional, Union

import numpy as np
from minizinc import Instance

from discrete_optimization.generic_rcpsp_tools.graph_tools import (
    GraphRcpsp,
    GraphSpecialConstraintsRcpsp,
)
from discrete_optimization.generic_rcpsp_tools.solvers.lns_cp.neighbor_tools import (
    ParamsConstraintBuilder,
    constraint_unit_used_subset_employees_preemptive,
    constraint_unit_used_to_tasks_preemptive,
)
from discrete_optimization.generic_tools.cp_tools import SignEnum
from discrete_optimization.generic_tools.lns_cp import MznConstraintHandler
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.rcpsp.problem import RcpspProblem
from discrete_optimization.rcpsp.problem_preemptive import (
    PreemptiveRcpspProblem,
    PreemptiveRcpspSolution,
)
from discrete_optimization.rcpsp.problem_specialized_constraints import (
    SpecialConstraintsPreemptiveRcpspProblem,
    SpecialPreemptiveRcpspSolution,
)
from discrete_optimization.rcpsp.solvers.cp_mzn import (
    CpMultimodePreemptiveRcpspSolver,
    CpPreemptiveRcpspSolver,
)
from discrete_optimization.rcpsp.utils import create_fake_tasks
from discrete_optimization.rcpsp_multiskill.problem import (
    MultiskillRcpspProblem,
    PreemptiveMultiskillRcpspSolution,
    VariantMultiskillRcpspProblem,
    create_fake_tasks_multiskills,
    employee_usage,
)
from discrete_optimization.rcpsp_multiskill.solvers.cp_mzn import (
    CpMultiskillRcpspSolver,
    CpPreemptiveMultiskillRcpspSolver,
)

logger = logging.getLogger(__name__)


ANY_RCPSP = Union[
    RcpspProblem,
    PreemptiveRcpspProblem,
    SpecialConstraintsPreemptiveRcpspProblem,
    MultiskillRcpspProblem,
    VariantMultiskillRcpspProblem,
]


[docs] def compute_shift_extremities(model: ANY_RCPSP): ressource_names = model.get_resource_names() ressource_arrays = { r: model.get_resource_availability_array(r) for r in ressource_names } if isinstance(model, MultiskillRcpspProblem): fake_tasks, fake_tasks_unit = create_fake_tasks_multiskills(model) f = fake_tasks + fake_tasks_unit else: fake_tasks = create_fake_tasks(model) f = fake_tasks return f
[docs] def return_pauses_and_active_times( model: ANY_RCPSP, solution: Union[ PreemptiveMultiskillRcpspSolution, PreemptiveRcpspSolution, SpecialPreemptiveRcpspSolution, ], ): tasks = model.get_tasks_list() dictionnary = {} for task in tasks: starts = solution.get_start_times_list(task) ends = solution.get_end_times_list(task) durations = [-starts[i] + ends[i] for i in range(len(starts))] dictionnary[task] = {"starts": starts, "ends": ends, "durations": durations} total_duration = sum(durations) dictionnary[task]["total_duration"] = total_duration if len(starts) > 1: diff = [starts[i + 1] - ends[i] for i in range(len(starts) - 1)] dictionnary[task]["diff_start_end"] = diff return dictionnary
[docs] def find_possible_problems_preemptive( model: ANY_RCPSP, solution: Union[ PreemptiveMultiskillRcpspSolution, PreemptiveRcpspSolution, SpecialPreemptiveRcpspSolution, ], ): dictionnary = return_pauses_and_active_times(model, solution) problems = {} for task in dictionnary: diff = dictionnary[task].get("diff_start_end", []) durations = dictionnary[task]["durations"] total_duration = dictionnary[task]["total_duration"] for j in range(len(diff)): if diff[j] < int(durations[j] / 10) or diff[j] == 1: if task not in problems: problems[task] = [] problems[task] += [ ( "end_start_problem", j, j + 1, { "prev_end": dictionnary[task]["ends"][j], "next_start": dictionnary[task]["starts"][j + 1], }, ) ] for j in range(len(durations)): if total_duration == 0: continue if (total_duration > 10 and durations[j] == 1) or ( durations[j] <= 0.05 * total_duration ): if task not in problems: problems[task] = [] problems[task] += [ ( "duration_problem", j, {"dur": durations[j], "tot": total_duration}, ) ] # starts[j+1] should be higher than end[j]+delta... # typical case : starts = [0, 50], [49, 53] return problems
[docs] def problem_constraints( current_solution: Union[PreemptiveRcpspSolution, PreemptiveMultiskillRcpspSolution], problems_output: dict[Hashable, list], minus_delta: int, plus_delta: int, cp_solver: Union[ CpPreemptiveRcpspSolver, CpMultimodePreemptiveRcpspSolver, CpPreemptiveMultiskillRcpspSolver, ], constraint_max_time=False, minus_delta_2=0, plus_delta_2=0, ): max_time = current_solution.get_end_time(task=current_solution.problem.sink_task) multimode = isinstance( cp_solver, (CpMultimodePreemptiveRcpspSolver, CpMultiskillRcpspSolver) ) if multimode: modes_dict = current_solution.problem.build_mode_dict( current_solution.rcpsp_modes ) subtasks = set(problems_output.keys()) if len(subtasks) >= 0.2 * current_solution.problem.n_jobs: subtasks = set( random.sample(list(subtasks), int(0.2 * current_solution.problem.n_jobs)) ) else: subtasks = subtasks.union( set( random.sample( current_solution.problem.get_tasks_list(), int(0.2 * current_solution.problem.n_jobs) - len(subtasks), ) ) ) jobs_to_fix = [ j for j in current_solution.problem.get_tasks_list() if j not in subtasks ] list_strings = [] subtasks_2 = current_solution.problem.get_tasks_list() string = ( """ int: nb_task_problems=""" + str(len(subtasks_2)) + """;\n array[1..nb_task_problems] of Tasks: task_list_problems=""" + str([cp_solver.index_in_minizinc[task] for task in subtasks_2]) + """;\n var int: nb_preemption_subtasks;\n var int: nb_small_tasks;\n constraint nb_preemption_subtasks=sum(i in 1..nb_task_problems)(sum(j in 2..nb_preemptive)(bool2int(d_preemptive[task_list_problems[i], j]>0)));\n constraint nb_small_tasks=sum(i in 1..nb_task_problems)(sum(j in 1..nb_preemptive)(bool2int(d_preemptive[task_list_problems[i], j]>0 /\ ((d_preemptive[task_list_problems[i], j]<=adur[task_list_problems[i]] div 20) \/ (d_preemptive[task_list_problems[i], j]==1 /\ adur[task_list_problems[i]]>10)))));\n """ ) list_strings += [string] weights = cp_solver.second_objectives["weights"] name_penalty = cp_solver.second_objectives["name_penalty"] def define_second_part_objective(weights_, name_penalty_): sum_string = "+".join( ["0"] + [str(weights_[i]) + "*" + name_penalty_[i] for i in range(len(weights_))] ) s = "constraint sec_objective==" + sum_string + ";\n" return [s] s = define_second_part_objective( weights_=weights + [1000, 1000], name_penalty_=name_penalty + ["nb_preemption_subtasks", "nb_small_tasks"], ) list_strings += s for job in subtasks: list_starts = current_solution.get_start_times_list(job) list_ends = current_solution.get_end_times_list(job) for j in range(len(list_starts)): start_time_j = list_starts[j] end_time_j = list_ends[j] duration_j = end_time_j - start_time_j string1_start = cp_solver.constraint_start_time_string_preemptive_i( task=job, start_time=max(0, start_time_j - minus_delta), sign=SignEnum.UEQ, part_id=j + 1, ) string2_start = cp_solver.constraint_start_time_string_preemptive_i( task=job, start_time=min(max_time, start_time_j + plus_delta) if constraint_max_time else start_time_j + plus_delta, sign=SignEnum.LEQ, part_id=j + 1, ) string1_dur = cp_solver.constraint_duration_string_preemptive_i( task=job, duration=max(duration_j - 20, 0), # 0 for index 1 sign=SignEnum.UEQ, part_id=j + 1, ) string2_dur = cp_solver.constraint_duration_string_preemptive_i( task=job, duration=duration_j + 20, # +50 for index 1 sign=SignEnum.LEQ, part_id=j + 1, ) list_strings += [string1_dur, string1_start, string2_dur, string2_start] for k in range(len(list_starts), cp_solver.nb_preemptive): if isinstance( cp_solver, (CpMultiskillRcpspSolver, CpPreemptiveMultiskillRcpspSolver) ): string1_dur = cp_solver.constraint_duration_string_preemptive_i( task=job, duration=0, sign=SignEnum.LEQ, part_id=k + 1 ) else: string1_dur = cp_solver.constraint_duration_string_preemptive_i( task=job, duration=0, sign=SignEnum.LEQ, part_id=k + 1 ) list_strings += [string1_dur] for job in jobs_to_fix: is_paused = len(current_solution.get_start_times_list(job)) > 1 is_paused_str = "true" if is_paused else "false" list_strings += [ "constraint is_paused[" + str(cp_solver.index_in_minizinc[job]) + "]==" + is_paused_str + ";\n" ] for job in jobs_to_fix: if multimode: list_strings += cp_solver.constraint_task_to_mode( task_id=job, mode=modes_dict[job] ) if job in subtasks: continue list_starts = current_solution.get_start_times_list(job) list_ends = current_solution.get_end_times_list(job) for j in range(len(list_starts)): start_time_j = list_starts[j] end_time_j = list_ends[j] duration_j = end_time_j - start_time_j if minus_delta_2 == 0 and plus_delta_2 == 0: string1_start = cp_solver.constraint_start_time_string_preemptive_i( task=job, start_time=start_time_j, sign=SignEnum.EQUAL, part_id=j + 1, ) string1_dur = cp_solver.constraint_duration_string_preemptive_i( task=job, duration=duration_j, sign=SignEnum.EQUAL, part_id=j + 1 ) list_strings += [string1_dur, string1_start] else: string1_start = cp_solver.constraint_start_time_string_preemptive_i( task=job, start_time=max(0, start_time_j - minus_delta_2), sign=SignEnum.UEQ, part_id=j + 1, ) string2_start = cp_solver.constraint_start_time_string_preemptive_i( task=job, start_time=min(max_time, start_time_j + plus_delta_2) if constraint_max_time else start_time_j + plus_delta_2, sign=SignEnum.LEQ, part_id=j + 1, ) string1_dur = cp_solver.constraint_duration_string_preemptive_i( task=job, duration=max(duration_j - 5, 0), sign=SignEnum.UEQ, part_id=j + 1, ) string2_dur = cp_solver.constraint_duration_string_preemptive_i( task=job, duration=duration_j + 5, sign=SignEnum.LEQ, part_id=j + 1 ) list_strings += [string1_dur, string1_start, string2_dur, string2_start] for k in range(len(list_starts), cp_solver.nb_preemptive): if minus_delta_2 == 0 and plus_delta_2 == 0: string1_start = cp_solver.constraint_start_time_string_preemptive_i( task=job, start_time=current_solution.get_end_time(job), sign=SignEnum.EQUAL, part_id=k + 1, ) list_strings += [string1_start] else: string1_start = cp_solver.constraint_start_time_string_preemptive_i( task=job, start_time=current_solution.get_end_time(job) - minus_delta_2, sign=SignEnum.UEQ, part_id=k + 1, ) string2_start = cp_solver.constraint_start_time_string_preemptive_i( task=job, start_time=min( max_time, current_solution.get_end_time(job) + plus_delta_2 ) if constraint_max_time else current_solution.get_end_time(job) + plus_delta_2, sign=SignEnum.LEQ, part_id=k + 1, ) list_strings += [string2_start, string1_start] string1_dur = cp_solver.constraint_duration_string_preemptive_i( task=job, duration=0, sign=SignEnum.EQUAL, part_id=k + 1 ) list_strings += [string1_dur] exceptions = [] for task in problems_output: for l in problems_output[task]: if l[0] == "end_start_problem": exceptions += [(task, l[1]), (task, l[2])] else: exceptions += [(task, l[1])] if random.random() < 0.99: exceptions = random.sample(exceptions, min(len(exceptions), 10)) list_strings += constraint_unit_used_to_tasks_preemptive( tasks_set=set( random.sample( current_solution.problem.get_tasks_list(), min( int(0.99 * current_solution.problem.n_jobs), current_solution.problem.n_jobs - 1, ), ) ), current_solution=current_solution, cp_solver=cp_solver, exceptions=exceptions, ) else: employee_usage_matrix, sum_usage, employees_usage_dict = employee_usage( solution=current_solution, problem=current_solution.problem ) sorted_employee = np.argsort(sum_usage) set_employees_to_fix = set( random.sample( current_solution.problem.employees_list, int(len(current_solution.problem.employees_list) - 2), ) ) list_strings += constraint_unit_used_subset_employees_preemptive( employees_set=set_employees_to_fix, current_solution=current_solution, cp_solver=cp_solver, employees_usage_dict=employees_usage_dict, exceptions=exceptions, ) return list_strings
[docs] def post_process_solution(model, solution): problem = find_possible_problems_preemptive(model, solution)
[docs] class NeighborRepairProblems(MznConstraintHandler): def __init__( self, problem: Union[ RcpspProblem, PreemptiveRcpspProblem, PreemptiveRcpspSolution, SpecialConstraintsPreemptiveRcpspProblem, MultiskillRcpspProblem, ], params_list: list[ParamsConstraintBuilder] = None, ): self.problem = problem if isinstance(self.problem, SpecialConstraintsPreemptiveRcpspProblem,) or ( isinstance(self.problem, RcpspProblem) and self.problem.do_special_constraints ): self.graph_rcpsp = GraphSpecialConstraintsRcpsp(problem=self.problem) self.special_constraints = True else: self.graph_rcpsp = GraphRcpsp(problem=self.problem) self.special_constraints = False params = ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=1, plus_delta_secondary=1, ) params_2 = ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=300, plus_delta_secondary=300, ) if params_list is None: self.params_list = [params, params_2] else: self.params_list = params_list
[docs] def adding_constraint_from_results_store( self, solver: Union[ CpPreemptiveRcpspSolver, CpPreemptiveMultiskillRcpspSolver, CpMultimodePreemptiveRcpspSolver, ], child_instance: Instance, result_storage: ResultStorage, last_result_store: Optional[ResultStorage] = None, **kwargs: Any, ) -> Iterable[Any]: if last_result_store is not None: current_solution, fit = next( ( last_result_store[j] for j in range(len(last_result_store)) if "opti_from_cp" in last_result_store[j][0].__dict__.keys() ), (None, None), ) else: current_solution, fit = next( ( result_storage[j] for j in range(len(result_storage)) if "opti_from_cp" in result_storage[j][0].__dict__.keys() ), (None, None), ) if current_solution is None: current_solution, fit = result_storage.get_last_best_solution() current_solution: PreemptiveRcpspSolution = current_solution logger.debug(f"{current_solution.get_nb_task_preemption()} task preempted") logger.debug(f"{current_solution.get_max_preempted()} most preempted task") logger.debug(f"{current_solution.total_number_of_cut()} total number of cut") evaluation = self.problem.evaluate(current_solution) logger.debug(f"Current Eval : {evaluation}") if evaluation.get("constraint_penalty", 0) == 0: p = self.params_list[1] else: p = self.params_list[1] problems = find_possible_problems_preemptive( model=self.problem, solution=current_solution ) logger.debug(f"{sum([len(problems[t]) for t in problems])} problems ?") logger.debug(f"Problems : {problems}") list_strings = problem_constraints( current_solution=current_solution, problems_output=problems, minus_delta=p.minus_delta_primary, plus_delta=p.plus_delta_primary, cp_solver=solver, constraint_max_time=p.constraint_max_time_to_current_solution, minus_delta_2=p.minus_delta_secondary, plus_delta_2=p.plus_delta_secondary, ) for s in list_strings: child_instance.add_string(s) if evaluation.get("constraint_penalty", 0) > 0: child_instance.add_string( "constraint objective=" + str(evaluation["makespan"]) + ";\n" ) else: string = solver.constraint_start_time_string( task=self.problem.sink_task, start_time=current_solution.get_start_time(self.problem.sink_task) + 200, sign=SignEnum.LEQ, ) child_instance.add_string(string) string = solver.constraint_start_time_string_preemptive_i( task=self.problem.sink_task, start_time=current_solution.get_end_time(self.problem.sink_task) + 200, part_id=solver.nb_preemptive, sign=SignEnum.LEQ, ) child_instance.add_string(string) weights = solver.second_objectives["weights"] name_penalty = solver.second_objectives["name_penalty"] sum_string = "+".join( ["0"] + [str(weights[i]) + "*" + name_penalty[i] for i in range(len(weights))] ) child_instance.add_string( "constraint " + sum_string + "<=" + str(int(1.01 * 100 * evaluation.get("constraint_penalty", 0))) + ";\n" ) if evaluation.get("constraint_penalty", 0) > 0: strings = [] else: strings = solver.constraint_objective_equal_makespan(self.problem.sink_task) for s in strings: child_instance.add_string(s) list_strings += [s] return list_strings