Source code for discrete_optimization.generic_rcpsp_tools.solvers.gphh

#  Copyright (c) 2022-2023 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
import operator
import random
from enum import Enum
from typing import Optional

import numpy as np
from deap import algorithms, creator, gp, tools
from deap.base import Fitness, Toolbox
from deap.gp import PrimitiveSet, PrimitiveTree, genHalfAndHalf

from discrete_optimization.generic_rcpsp_tools.solvers import GenericRcpspSolver
from discrete_optimization.generic_rcpsp_tools.typing import ANY_RCPSP
from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Problem,
)
from discrete_optimization.rcpsp.solution import RcpspSolution
from discrete_optimization.rcpsp.solvers.cpm import CpmRcpspSolver
from discrete_optimization.rcpsp_multiskill.problem import (
    MultiskillRcpspProblem,
    VariantMultiskillRcpspSolution,
)

logger = logging.getLogger(__name__)


[docs] def if_then_else(input1, output1, output2): if input1: return output1 else: return output2
[docs] def protected_div(left, right): if right != 0.0: return left / right else: return 1.0
[docs] def max_operator(left, right): return max(left, right)
[docs] def min_operator(left, right): return min(left, right)
[docs] def feature_task_duration(problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, **kwargs): return problem.mode_details[task_id][1]["duration"]
[docs] def feature_total_n_res(problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, **kwargs): val = 0 mode_consumption = problem.mode_details[task_id][1] for res in mode_consumption: if res == "duration": continue val += mode_consumption[res] return val
[docs] def feature_n_successors(problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, **kwargs): return len(problem.successors[task_id]) / problem.n_jobs
[docs] def feature_n_predecessors(problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, **kwargs): return len(problem.graph.get_predecessors(task_id)) / problem.n_jobs
[docs] def get_resource_requirements_across_duration( problem: ANY_RCPSP, task_id: int, **kwargs ): values = [] mode_consumption = problem.mode_details[task_id][1] duration = mode_consumption["duration"] if duration > 0: for res in problem.resources_list: need = mode_consumption.get(res, 0) / problem.get_max_resource_capacity(res) values.append(need / duration) else: values = [0.0] return values
[docs] def feature_average_resource_requirements( problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, **kwargs ): values = get_resource_requirements_across_duration(problem=problem, task_id=task_id) if len(values) == 0: return 0 val = np.mean(values) return val
[docs] def feature_minimum_resource_requirements( problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, **kwargs ): values = get_resource_requirements_across_duration(problem=problem, task_id=task_id) if len(values) == 0: return 0 val = np.min(values) return val
[docs] def feature_non_zero_minimum_resource_requirements( problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, **kwargs ): values = get_resource_requirements_across_duration(problem=problem, task_id=task_id) if len(values) == 0: return 0 if np.sum(values) > 0.0: val = np.min([x for x in values if x > 0.0]) else: val = np.min(values) return val
[docs] def feature_maximum_resource_requirements( problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, **kwargs ): values = get_resource_requirements_across_duration(problem=problem, task_id=task_id) if len(values) == 0: return 0 val = np.max(values) return val
[docs] def feature_resource_requirements( problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, **kwargs ): values = get_resource_requirements_across_duration(problem=problem, task_id=task_id) if len(values) > 0: val = len([x for x in values if x > 0.0]) / len(values) return val else: return 0
[docs] def feature_all_descendants(problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, **kwargs): return len(problem.graph.full_successors[task_id]) / problem.n_jobs
[docs] def compute_cpm(problem: ANY_RCPSP): cpm_solver = CpmRcpspSolver(problem) path = cpm_solver.run_classic_cpm() cpm = cpm_solver.map_node cpm_esd = cpm[path[-1]]._ESD # to normalize... return cpm, cpm_esd
[docs] def feature_esd(problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, **kwargs): """Will only work if you store cpm results into the object. dirty trick""" return cpm[task_id]._ESD / cpm_esd
[docs] def feature_lsd(problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, **kwargs): """Will only work if you store cpm results into the object. dirty trick""" return cpm[task_id]._LSD / cpm_esd
[docs] def feature_efd(problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, **kwargs): """Will only work if you store cpm results into the object. dirty trick""" return cpm[task_id]._EFD / cpm_esd
[docs] def feature_lfd(problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, **kwargs): """Will only work if you store cpm results into the object. dirty trick""" return cpm[task_id]._LFD / cpm_esd
[docs] def get_dummy(problem: ANY_RCPSP, cpm, cpm_esd, task_id: int, increase: int, **kwargs): """Will only work if you store cpm results into the object. dirty trick""" return increase
[docs] class FeatureEnum(Enum): TASK_DURATION = "task_duration" RESSOURCE_TOTAL = "total_nres" N_SUCCESSORS = "n_successors" N_PREDECESSORS = "n_predecessors" RESSOURCE_REQUIRED = "res_requ" RESSOURCE_AVG = "avg_res_requ" RESSOURCE_MIN = "min_res_requ" RESSOURCE_NZ_MIN = "nz_min_res_requ" RESSOURCE_MAX = "max_res_requ" ALL_DESCENDANTS = "all_descendants" EARLIEST_START_DATE = "ESD" LATEST_START_DATE = "LSD" EARLIEST_FINISH_DATE = "EFD" LATEST_FINISH_DATE = "LFD" DUMMY = "DUMMY"
feature_function_map = { FeatureEnum.TASK_DURATION: feature_task_duration, FeatureEnum.RESSOURCE_TOTAL: feature_total_n_res, FeatureEnum.N_SUCCESSORS: feature_n_successors, FeatureEnum.N_PREDECESSORS: feature_n_predecessors, FeatureEnum.RESSOURCE_REQUIRED: feature_resource_requirements, FeatureEnum.RESSOURCE_AVG: feature_average_resource_requirements, FeatureEnum.RESSOURCE_MIN: feature_minimum_resource_requirements, FeatureEnum.RESSOURCE_NZ_MIN: feature_non_zero_minimum_resource_requirements, FeatureEnum.RESSOURCE_MAX: feature_maximum_resource_requirements, FeatureEnum.ALL_DESCENDANTS: feature_all_descendants, FeatureEnum.EARLIEST_START_DATE: feature_esd, FeatureEnum.EARLIEST_FINISH_DATE: feature_efd, FeatureEnum.LATEST_START_DATE: feature_lsd, FeatureEnum.LATEST_FINISH_DATE: feature_lfd, FeatureEnum.DUMMY: get_dummy, } feature_static_map = { FeatureEnum.TASK_DURATION: True, FeatureEnum.RESSOURCE_TOTAL: True, FeatureEnum.N_SUCCESSORS: True, FeatureEnum.N_PREDECESSORS: True, FeatureEnum.RESSOURCE_REQUIRED: True, FeatureEnum.RESSOURCE_AVG: True, FeatureEnum.RESSOURCE_MIN: True, FeatureEnum.RESSOURCE_NZ_MIN: True, FeatureEnum.RESSOURCE_MAX: True, FeatureEnum.ALL_DESCENDANTS: True, FeatureEnum.EARLIEST_START_DATE: True, FeatureEnum.EARLIEST_FINISH_DATE: True, FeatureEnum.LATEST_START_DATE: True, FeatureEnum.LATEST_FINISH_DATE: True, FeatureEnum.DUMMY: False, }
[docs] class EvaluationGphh(Enum): SGS = 0 PERMUTATION_DISTANCE = 1
[docs] class PermutationDistance(Enum): KTD = 0 HAMMING = 1 KTD_HAMMING = 2
[docs] class ParametersGphh: set_feature: set[FeatureEnum] = None set_primitves: PrimitiveSet = None tournament_ratio: float = None pop_size: int = None n_gen: int = None min_tree_depth: int = None max_tree_depth: int = None crossover_rate: float = None mutation_rate: float = None deap_verbose: bool = None evaluation: EvaluationGphh = None permutation_distance = PermutationDistance.KTD def __init__( self, set_feature, set_primitves, tournament_ratio, pop_size, n_gen, min_tree_depth, max_tree_depth, crossover_rate, mutation_rate, deap_verbose, evaluation, permutation_distance, ): self.set_feature = set_feature self.set_primitves = set_primitves self.tournament_ratio = tournament_ratio self.pop_size = pop_size self.n_gen = n_gen self.min_tree_depth = min_tree_depth self.max_tree_depth = max_tree_depth self.crossover_rate = crossover_rate self.mutation_rate = mutation_rate self.deap_verbose = deap_verbose self.evaluation = evaluation self.permutation_distance = permutation_distance
[docs] @staticmethod def default(): set_feature = { FeatureEnum.EARLIEST_FINISH_DATE, FeatureEnum.EARLIEST_START_DATE, FeatureEnum.LATEST_FINISH_DATE, FeatureEnum.LATEST_START_DATE, FeatureEnum.N_PREDECESSORS, FeatureEnum.N_SUCCESSORS, FeatureEnum.ALL_DESCENDANTS, FeatureEnum.RESSOURCE_REQUIRED, FeatureEnum.RESSOURCE_AVG, FeatureEnum.RESSOURCE_MAX, FeatureEnum.RESSOURCE_NZ_MIN, } pset = PrimitiveSet("main", len(set_feature)) pset.addPrimitive(operator.add, 2) pset.addPrimitive(operator.sub, 2) pset.addPrimitive(operator.mul, 2) pset.addPrimitive(protected_div, 2) pset.addPrimitive(max_operator, 2) pset.addPrimitive(min_operator, 2) pset.addPrimitive(operator.neg, 1) return ParametersGphh( set_feature=set_feature, set_primitves=pset, tournament_ratio=0.1, pop_size=10, n_gen=2, min_tree_depth=1, max_tree_depth=4, crossover_rate=0.7, mutation_rate=0.3, deap_verbose=True, evaluation=EvaluationGphh.SGS, permutation_distance=PermutationDistance.KTD, )
[docs] @staticmethod def fast_test(): set_feature = { FeatureEnum.EARLIEST_FINISH_DATE, FeatureEnum.EARLIEST_START_DATE, FeatureEnum.LATEST_FINISH_DATE, FeatureEnum.LATEST_START_DATE, FeatureEnum.N_PREDECESSORS, FeatureEnum.N_SUCCESSORS, FeatureEnum.ALL_DESCENDANTS, FeatureEnum.RESSOURCE_REQUIRED, FeatureEnum.RESSOURCE_AVG, FeatureEnum.RESSOURCE_MAX, FeatureEnum.RESSOURCE_NZ_MIN, } pset = PrimitiveSet("main", len(set_feature)) pset.addPrimitive(operator.add, 2) pset.addPrimitive(operator.sub, 2) pset.addPrimitive(operator.mul, 2) pset.addPrimitive(protected_div, 2) pset.addPrimitive(max_operator, 2) pset.addPrimitive(min_operator, 2) pset.addPrimitive(operator.neg, 1), pset.addEphemeralConstant(lambda: random.uniform(-1, 1), float) pset.addEphemeralConstant(lambda: random.uniform(-1, 1), float) pset.addTerminal(1.0, float) pset.addTerminal(1, bool) pset.addTerminal(0, bool) return ParametersGphh( set_feature=set_feature, set_primitves=pset, tournament_ratio=0.1, pop_size=10, n_gen=2, min_tree_depth=1, max_tree_depth=4, crossover_rate=0.7, mutation_rate=0.3, deap_verbose=True, evaluation=EvaluationGphh.SGS, permutation_distance=PermutationDistance.KTD, )
[docs] @staticmethod def ms_fast(): set_feature = { FeatureEnum.EARLIEST_FINISH_DATE, FeatureEnum.EARLIEST_START_DATE, FeatureEnum.LATEST_FINISH_DATE, FeatureEnum.LATEST_START_DATE, FeatureEnum.N_PREDECESSORS, FeatureEnum.N_SUCCESSORS, FeatureEnum.ALL_DESCENDANTS, } pset = PrimitiveSet("main", len(set_feature)) pset.addPrimitive(operator.add, 2) pset.addPrimitive(operator.sub, 2) pset.addPrimitive(operator.mul, 2) pset.addPrimitive(protected_div, 2) pset.addPrimitive(max_operator, 2) pset.addPrimitive(min_operator, 2) pset.addPrimitive(operator.neg, 1) return ParametersGphh( set_feature=set_feature, set_primitves=pset, tournament_ratio=0.1, pop_size=4, n_gen=2, min_tree_depth=1, max_tree_depth=4, crossover_rate=0.7, mutation_rate=0.3, deap_verbose=True, evaluation=EvaluationGphh.SGS, permutation_distance=PermutationDistance.KTD, )
[docs] @staticmethod def ms_default(): set_feature = { FeatureEnum.EARLIEST_FINISH_DATE, FeatureEnum.EARLIEST_START_DATE, FeatureEnum.LATEST_FINISH_DATE, FeatureEnum.LATEST_START_DATE, FeatureEnum.N_PREDECESSORS, FeatureEnum.N_SUCCESSORS, FeatureEnum.ALL_DESCENDANTS, } pset = PrimitiveSet("main", len(set_feature)) pset.addPrimitive(operator.add, 2) pset.addPrimitive(operator.sub, 2) pset.addPrimitive(operator.mul, 2) pset.addPrimitive(protected_div, 2) pset.addPrimitive(max_operator, 2) pset.addPrimitive(min_operator, 2) pset.addPrimitive(operator.neg, 1) return ParametersGphh( set_feature=set_feature, set_primitves=pset, tournament_ratio=0.1, pop_size=40, n_gen=100, min_tree_depth=1, max_tree_depth=4, crossover_rate=0.7, mutation_rate=0.3, deap_verbose=True, evaluation=EvaluationGphh.SGS, permutation_distance=PermutationDistance.KTD, )
[docs] @staticmethod def default_for_set_features(set_feature: set[FeatureEnum]): pset = PrimitiveSet("main", len(set_feature)) pset.addPrimitive(operator.add, 2) pset.addPrimitive(operator.sub, 2) pset.addPrimitive(operator.mul, 2) pset.addPrimitive(max_operator, 2) pset.addPrimitive(min_operator, 2) pset.addPrimitive(operator.neg, 1) return ParametersGphh( set_feature=set_feature, set_primitves=pset, tournament_ratio=0.25, pop_size=20, n_gen=20, min_tree_depth=1, max_tree_depth=4, crossover_rate=0.7, mutation_rate=0.1, deap_verbose=True, evaluation=EvaluationGphh.PERMUTATION_DISTANCE, permutation_distance=PermutationDistance.KTD, )
[docs] class GphhGenericRcpspSolver(GenericRcpspSolver): training_domains: list[Problem] weight: int pset: PrimitiveSet toolbox: Toolbox params_gphh: ParametersGphh evaluation_method: EvaluationGphh reference_permutations: dict permutation_distance: PermutationDistance def __init__( self, problem: Problem, training_domains: Optional[list[Problem]] = None, weight: int = 1, params_gphh: ParametersGphh = None, params_objective_function: ParamsObjectiveFunction = None, **kwargs, ): super().__init__( problem=problem, params_objective_function=params_objective_function ) if training_domains is None: self.training_domains = [problem] else: self.training_domains = training_domains self.params_gphh = params_gphh if self.params_gphh is None: self.params_gphh = ParametersGphh.default() self.set_feature = self.params_gphh.set_feature self.list_feature = list(self.set_feature) self.list_feature_names = [value.value for value in list(self.list_feature)] self.pset = self.init_primitives(self.params_gphh.set_primitves) self.weight = weight self.evaluation_method = self.params_gphh.evaluation model = self.problem try: if model.graph.full_successors is None: model.graph.full_predecessors = model.graph.ancestors_map() model.graph.full_successors = model.graph.descendants_map() except: pass self.initialize_cpm_data_for_training() self.graphs = {} self.toolbox = None
[docs] def init_model(self): tournament_ratio = self.params_gphh.tournament_ratio pop_size = self.params_gphh.pop_size min_tree_depth = self.params_gphh.min_tree_depth max_tree_depth = self.params_gphh.max_tree_depth creator.create("FitnessMin", Fitness, weights=(self.weight,)) creator.create("Individual", PrimitiveTree, fitness=creator.FitnessMin) self.toolbox = Toolbox() self.toolbox.register( "expr", genHalfAndHalf, pset=self.pset, min_=min_tree_depth, max_=max_tree_depth, ) self.toolbox.register( "individual", tools.initIterate, creator.Individual, self.toolbox.expr ) self.toolbox.register( "population", tools.initRepeat, list, self.toolbox.individual ) self.toolbox.register("compile", gp.compile, pset=self.pset) if self.evaluation_method == EvaluationGphh.SGS: self.toolbox.register( "evaluate", self.evaluate_heuristic, domains=self.training_domains ) elif self.evaluation_method == EvaluationGphh.PERMUTATION_DISTANCE: self.toolbox.register( "evaluate", self.evaluate_heuristic_permutation, domains=self.training_domains, ) self.toolbox.register( "select", tools.selTournament, tournsize=int(tournament_ratio * pop_size) ) self.toolbox.register("mate", gp.cxOnePoint) self.toolbox.register("expr_mut", gp.genFull, min_=0, max_=max_tree_depth) self.toolbox.register( "mutate", gp.mutUniform, expr=self.toolbox.expr_mut, pset=self.pset ) self.toolbox.decorate( "mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17) ) self.toolbox.decorate( "mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17) ) stats_fit = tools.Statistics(lambda ind: ind.fitness.values) stats_size = tools.Statistics(len) mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size) mstats.register("avg", np.mean) mstats.register("std", np.std) mstats.register("min", np.min) mstats.register("max", np.max)
[docs] def solve(self, **kwargs): if self.toolbox is None: self.init_model() stats_fit = tools.Statistics(lambda ind: ind.fitness.values) stats_size = tools.Statistics(len) mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size) mstats.register("avg", np.mean) mstats.register("std", np.std) mstats.register("min", np.min) mstats.register("max", np.max) pop = self.toolbox.population(n=self.params_gphh.pop_size) hof = tools.HallOfFame(1000) self.hof = hof pop, log = algorithms.eaSimple( pop, self.toolbox, cxpb=self.params_gphh.crossover_rate, mutpb=self.params_gphh.mutation_rate, ngen=self.params_gphh.n_gen, stats=mstats, halloffame=hof, verbose=True, ) self.best_heuristic = hof[0] logger.debug(f"best_heuristic: {self.best_heuristic}") self.final_pop = pop self.func_heuristic = self.toolbox.compile(expr=self.best_heuristic) solution = self.build_solution( domain=self.problem, func_heuristic=self.func_heuristic ) return self.create_result_storage( [(solution, self.aggreg_from_sol(solution))], )
[docs] def init_primitives(self, pset) -> PrimitiveSet: for i in range(len(self.list_feature)): pset.renameArguments(**{"ARG" + str(i): self.list_feature[i].value}) return pset
[docs] def build_solution(self, domain, individual=None, func_heuristic=None): if func_heuristic is None: func_heuristic = self.toolbox.compile(expr=individual) d: ANY_RCPSP = domain modes = [1 for j in range(d.n_jobs_non_dummy)] cpm = self.cpm_data[domain]["cpm"] cpm_esd = self.cpm_data[domain]["cpm_esd"] raw_values = [] for task_id in d.tasks_list: input_features = [ feature_function_map[lf]( problem=domain, cpm=cpm, cpm_esd=cpm_esd, task_id=task_id, increase=1, ) for lf in self.list_feature ] output_value = func_heuristic(*input_features) raw_values.append(output_value) normalized_values = sorted( range(len(raw_values)), key=lambda k: raw_values[k], reverse=False ) normalized_values_for_do = [ d.index_task_non_dummy[d.tasks_list[t]] for t in normalized_values if d.tasks_list[t] in d.index_task_non_dummy ] if isinstance(domain, MultiskillRcpspProblem): solution = VariantMultiskillRcpspSolution( problem=d, priority_list_task=normalized_values_for_do, priority_worker_per_task=[ [w for w in d.employees_list] for i in range(d.n_jobs_non_dummy) ], modes_vector=modes, ) else: solution = RcpspSolution( problem=d, rcpsp_permutation=normalized_values_for_do, rcpsp_modes=modes ) return solution
[docs] def evaluate_heuristic(self, individual, domains) -> float: vals = [] func_heuristic = self.toolbox.compile(expr=individual) for domain in domains: solution = self.build_solution( individual=individual, domain=domain, func_heuristic=func_heuristic ) do_makespan = -solution.get_end_time(domain.sink_task) vals.append(do_makespan) fitness = [np.mean(vals)] return fitness
[docs] def initialize_cpm_data_for_training(self): self.cpm_data = {} for domain in self.training_domains: cpm, cpm_esd = compute_cpm(domain) self.cpm_data[domain] = {"cpm": cpm, "cpm_esd": cpm_esd}