Source code for discrete_optimization.top.problem

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

from __future__ import annotations

from collections import defaultdict
from collections.abc import Sequence
from typing import Union

from discrete_optimization.generic_tools.do_problem import (
    ModeOptim,
    ObjectiveDoc,
    ObjectiveHandling,
    ObjectiveRegister,
    TypeObjective,
)
from discrete_optimization.vrp.problem import (
    BasicCustomer,
    VrpProblem,
    VrpSolution,
    length,
)


[docs] class CustomerTop(BasicCustomer): def __init__(self, name: Union[str, int], reward: float): super().__init__(name, demand=0) self.reward = reward
[docs] class TeamOrienteeringProblem(VrpProblem): customers: list[CustomerTop] max_length_tours: float def __init__( self, vehicle_count: int, max_length_tours: float, customer_count: int, customers: Sequence[BasicCustomer], start_indexes: list[int], end_indexes: list[int], ): super().__init__( vehicle_count, [float("inf")] * vehicle_count, customer_count, customers, start_indexes, end_indexes, ) self.max_length_tours = max_length_tours
[docs] def evaluate(self, solution: VrpSolution) -> dict[str, float]: vals_per_vehicle = self.evaluate_function(solution) penalty_length = sum( [max(0, x[1] - self.max_length_tours) for x in vals_per_vehicle] ) reward = sum(x[0] for x in vals_per_vehicle) count_multiple = self.count_multiple_visits(solution) return { "reward": reward, "penalty_length": penalty_length, "penalty_multiple": count_multiple, }
[docs] def evaluate_function( self, vrp_sol: VrpSolution ) -> tuple[list[list[float]], list[float], float, list[float]]: vals_per_vehicle = [] for k in range(self.vehicle_count): path = vrp_sol.list_paths[k] start = vrp_sol.list_start_index[k] end = vrp_sol.list_end_index[k] full_path = [start] + path + [end] length = sum( self.evaluate_function_indexes(xi, xi1) for xi, xi1 in zip(full_path[:-1], full_path[1:]) ) reward = sum(self.customers[xi].reward for xi in path) vals_per_vehicle.append((reward, length)) return vals_per_vehicle
[docs] def satisfy(self, variable: VrpSolution) -> bool: kpis = self.evaluate(variable) return kpis["penalty_length"] == 0 and kpis["penalty_multiple"] == 0
[docs] def count_multiple_visits(self, vrp_sol: VrpSolution) -> bool: count = defaultdict(lambda: 0) for k in range(len(vrp_sol.list_paths)): for node in vrp_sol.list_paths[k]: count[node] += 1 return len([n for n in count if count[n] > 1])
[docs] def get_objective_register(self) -> ObjectiveRegister: return ObjectiveRegister( objective_sense=ModeOptim.MINIMIZATION, objective_handling=ObjectiveHandling.AGGREGATE, dict_objective_to_doc={ "reward": ObjectiveDoc(type=TypeObjective.OBJECTIVE, default_weight=1), "penalty_length": ObjectiveDoc( type=TypeObjective.PENALTY, default_weight=-100 ), "penalty_multiple": ObjectiveDoc( type=TypeObjective.PENALTY, default_weight=-100 ), }, )
[docs] class CustomerTop2D(CustomerTop): def __init__(self, name: Union[str, int], reward: float, x: float, y: float): super().__init__(name, reward) self.x = x self.y = y
[docs] class TeamOrienteeringProblem2D(TeamOrienteeringProblem): customers: list[CustomerTop] max_length_tours: float
[docs] def evaluate_function_indexes(self, index_1: int, index_2: int) -> float: return length(self.customers[index_1], self.customers[index_2])