Source code for discrete_optimization.generic_rcpsp_tools.mutation

#  Copyright (c) 2022-2025 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

from __future__ import annotations

import random
from typing import Any, Optional

import numpy as np

from discrete_optimization.generic_rcpsp_tools.attribute_type import PermutationRcpsp
from discrete_optimization.generic_rcpsp_tools.typing import (
    ANY_RCPSP_FOR_MUTATION,
    ANY_SOLUTION_FOR_MUTATION,
    is_instance_any_rcpsp_solution,
)
from discrete_optimization.generic_tools.do_mutation import (
    LocalMove,
    SingleAttributeMutation,
)
from discrete_optimization.generic_tools.do_problem import Problem
from discrete_optimization.generic_tools.mutations.mutation_permutation import (
    ShuffleMove,
    SwapsLocalMove,
)


[docs] class RcpspMutation(SingleAttributeMutation): """Wrapper around other mutations which makes recompute the schedule.""" attribute_type_cls = PermutationRcpsp attribute_type: PermutationRcpsp problem: ANY_RCPSP_FOR_MUTATION
[docs] @classmethod def build( cls, problem: Problem, other_mutation_cls: type[SingleAttributeMutation], **kwargs: Any, ) -> RcpspMutation: built_other_mutation = other_mutation_cls.build(problem, **kwargs) return cls(problem, other_mutation=built_other_mutation)
def __init__( self, problem: ANY_RCPSP_FOR_MUTATION, other_mutation: SingleAttributeMutation, **kwargs: Any, ): attribute = other_mutation.attribute super().__init__(problem=problem, attribute=attribute, **kwargs) self.other_mutation = other_mutation
[docs] def mutate( self, solution: ANY_SOLUTION_FOR_MUTATION ) -> tuple[ANY_SOLUTION_FOR_MUTATION, LocalMove]: # type: ignore s: ANY_SOLUTION_FOR_MUTATION s, lm = self.other_mutation.mutate(solution) # type: ignore _recompute_schedule_n_standardised_permutation(s) return s, lm
[docs] def mutate_and_compute_obj( # type: ignore self, solution: ANY_SOLUTION_FOR_MUTATION ) -> tuple[ANY_SOLUTION_FOR_MUTATION, LocalMove, dict[str, float]]: s: ANY_SOLUTION_FOR_MUTATION s, lm, fit = self.other_mutation.mutate_and_compute_obj(solution) # type: ignore _recompute_schedule_n_standardised_permutation(s) return s, lm, fit
def __repr__(self): return f"{type(self).__name__}(other_mutation='{self.other_mutation}')"
def _recompute_schedule_n_standardised_permutation(solution: ANY_SOLUTION_FOR_MUTATION): try: solution._schedule_to_recompute = True solution.standardised_permutation = ( solution.generate_permutation_from_schedule() ) # type: ignore except: solution._schedule_to_recompute = True
[docs] class DeadlineRcpspMutation(SingleAttributeMutation): attribute_type_cls = PermutationRcpsp attribute_type: PermutationRcpsp problem: ANY_RCPSP_FOR_MUTATION def __init__( self, problem: ANY_RCPSP_FOR_MUTATION, attribute: Optional[str] = None, nb_swap: int = 1, **kwargs: Any, ): super().__init__(problem=problem, attribute=attribute, **kwargs) self.nb_swap = nb_swap self.length = self.attribute_type.length try: self.full_predecessors = self.problem.graph.ancestors_map() # type: ignore except: pass
[docs] def mutate( self, solution: ANY_SOLUTION_FOR_MUTATION ) -> tuple[ANY_SOLUTION_FOR_MUTATION, LocalMove]: # type: ignore if not is_instance_any_rcpsp_solution(solution): raise ValueError("solution must be an rcsp solution (of any kind)") if "special_constraints" in self.problem.__dict__.keys(): ls = [ ( t, solution.get_end_time(t) - self.problem.special_constraints.end_times_window[t][1], ) for t in self.problem.special_constraints.end_times_window if self.problem.special_constraints.end_times_window[t][1] is not None and solution.get_end_time(t) > self.problem.special_constraints.end_times_window[t][1] ] if len(ls) > 0: x = random.choice(ls) t = x[0] pred = [tt for tt in self.full_predecessors[t]] + [t] previous = list(getattr(solution, self.attribute)) new = [ self.problem.index_task_non_dummy[tt] for tt in pred if tt in self.problem.index_task_non_dummy ] for x in previous: if x not in new: new += [x] sol = solution.lazy_copy() setattr(sol, self.attribute, new) return ( sol, ShuffleMove( self.attribute, new_permutation=new, prev_permutation=previous, ), ) swaps = np.random.randint(low=0, high=self.length - 1, size=(1, 2)) move = SwapsLocalMove( self.attribute, [(int(swaps[i, 0]), int(swaps[i, 1])) for i in range(1)] ) next_sol = move.apply_local_move(solution) return next_sol, move