Source code for discrete_optimization.top.solvers.dp

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import math
from typing import Any

import didppy as dp
import numpy as np

from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
)
from discrete_optimization.generic_tools.dyn_prog_tools import DpSolver
from discrete_optimization.top.problem import TeamOrienteeringProblem, VrpSolution
from discrete_optimization.vrp.utils import compute_length_matrix


[docs] class DpTopSolver(DpSolver): problem: TeamOrienteeringProblem def __init__( self, problem: TeamOrienteeringProblem, params_objective_function: ParamsObjectiveFunction | None = None, **kwargs, ): super().__init__(problem, params_objective_function, **kwargs) _, self.distance = compute_length_matrix(self.problem) self.distance[self.problem.start_indexes[0], self.problem.end_indexes[0]] = 0 self.distance[self.problem.end_indexes[0], self.problem.start_indexes[0]] = 0 self.transitions_dict = {}
[docs] def init_model(self, scaling: float = 1, **kwargs: Any) -> None: self.model = dp.Model(maximize=True) distance = (np.ceil(scaling * self.distance)).astype(int) reward = [int(c.reward) for c in self.problem.customers] reward_table = self.model.add_int_table(reward) nodes = self.model.add_object_type(self.problem.customer_count) vehicle = self.model.add_object_type(self.problem.vehicle_count) nodes_to_visit = [ i for i in range(self.problem.customer_count) if i not in self.problem.start_indexes and i not in self.problem.end_indexes ] distance_to_end = [ [ distance[k, self.problem.end_indexes[j]] for k in range(self.problem.customer_count) ] for j in range(self.problem.vehicle_count) ] distance_to_end = [ distance[k, self.problem.end_indexes[0]] for k in range(self.problem.customer_count) ] distance_to_end_table = self.model.add_int_table(distance_to_end) distance_from_start = [ distance[self.problem.start_indexes[0], k] for k in range(self.problem.customer_count) ] distance_from_start_table = self.model.add_int_table(distance_from_start) distance_to_and_back = [ [ distance[i, j] + distance[j, self.problem.end_indexes[0]] for j in range(self.problem.customer_count) ] for i in range(self.problem.customer_count) ] distance_to_and_back_table = self.model.add_int_table(distance_to_and_back) unvisited = self.model.add_set_var(object_type=nodes, target=nodes_to_visit) cur_node = self.model.add_element_var( object_type=nodes, target=self.problem.start_indexes[0] ) cur_vehicle = self.model.add_element_resource_var( object_type=vehicle, target=0, less_is_better=True ) distance_table = self.model.add_int_table( [ [distance[i, j] for j in range(distance.shape[1])] for i in range(distance.shape[0]) ] ) time = self.model.add_int_resource_var(target=0, less_is_better=True) max_time = int(math.floor(scaling * self.problem.max_length_tours)) self.model.add_state_constr(time + distance_to_end_table[cur_node] <= max_time) # self.model.add_state_constr(time<=max_time) for i in nodes_to_visit: tr = dp.Transition( name=f"visit_{i}_same_vehicle", cost=100 * reward[i] - distance_table[cur_node, i] + dp.IntExpr.state_cost(), effects=[ (unvisited, unvisited.remove(i)), (cur_node, i), (time, time + distance_table[cur_node, i]), ], preconditions=[ unvisited.contains(i), time + distance_to_and_back_table[cur_node, i] <= max_time, ], ) self.transitions_dict[f"visit_{i}_same_vehicle"] = (i, "same") self.model.add_transition(tr) # tr = dp.Transition(name=f"visit_{i}_change_vehicle", # cost=1000*reward[i]-distance_to_end_table[cur_node] # -distance_from_start[i] # + dp.IntExpr.state_cost(), # effects=[(unvisited, unvisited.remove(i)), # (cur_node, i), # (time, distance_from_start[i]), # (cur_vehicle, cur_vehicle+1)], # preconditions=[unvisited.contains(i), # cur_vehicle < self.problem.vehicle_count-1, # # distance_from_start_table[i]<=max_time # ]) # self.model.add_transition(tr) # self.transitions_dict[f"visit_{i}_change_vehicle"] = (i, "next") start_nodes = self.model.add_element_table(self.problem.start_indexes) tr_change_vehicle = dp.Transition( name=f"next_vehicle", cost=-distance_to_end_table[cur_node] + dp.IntExpr.state_cost(), effects=[ (time, 0), (cur_node, start_nodes[cur_vehicle + 1]), (cur_vehicle, cur_vehicle + 1), ], preconditions=[ cur_vehicle < self.problem.vehicle_count - 1, time + distance_to_and_back_table.min(cur_node, unvisited) > max_time, ], ) self.model.add_transition(tr_change_vehicle, forced=True) self.transitions_dict[f"next_vehicle"] = (None, "next-vehicle") finish_ = self.model.add_int_var(0) finish = dp.Transition( name=f"finish", cost=dp.IntExpr.state_cost(), effects=[(finish_, 1)], preconditions=[ (cur_vehicle == self.problem.vehicle_count - 1), (time + distance_to_and_back_table.min(cur_node, unvisited) > max_time) | unvisited.is_empty(), ], ) self.transitions_dict[f"finish"] = (None, "finish") self.model.add_transition(finish, forced=True) self.model.add_base_case([(finish_ == 1)])
# self.model.add_dual_bound(1000*reward_table[unvisited])
[docs] def retrieve_solution(self, sol: dp.Solution) -> VrpSolution: paths = [] cur_path = [] for tr in sol.transitions: name = tr.name descr = self.transitions_dict[name] if descr[1] == "same": cur_path.append(descr[0]) if descr[1] == "next": paths.append(cur_path) cur_path = [descr[0]] if descr[1] == "next-vehicle": paths.append(cur_path) cur_path = [] if descr[1] == "finish": paths.append(cur_path) break if len(paths) < self.problem.vehicle_count: paths.extend([[] for _ in range(self.problem.vehicle_count - len(paths))]) return VrpSolution( problem=self.problem, list_paths=paths, list_start_index=self.problem.start_indexes, list_end_index=self.problem.end_indexes, ) pass
# # Copyright (c) 2026 AIRBUS and its affiliates. # # This source code is licensed under the MIT license found in the # # LICENSE file in the root directory of this source tree. # # import math # from typing import Any, List, Tuple # import didppy as dp # import numpy as np # # from discrete_optimization.generic_tools.dyn_prog_tools import DpSolver # from discrete_optimization.generic_tools.do_problem import ParamsObjectiveFunction # from discrete_optimization.top.problem import TeamOrienteeringProblem, VrpSolution # from discrete_optimization.vrp.utils import compute_length_matrix # # # class DpTopSolver(DpSolver): # problem: TeamOrienteeringProblem # # def __init__(self, problem: TeamOrienteeringProblem, # params_objective_function: ParamsObjectiveFunction | None = None, # **kwargs): # super().__init__(problem, params_objective_function, **kwargs) # # Compute distance matrix and ensure depot-to-depot distance is handled if needed # _, self.distance = compute_length_matrix(self.problem) # # Map transition names to logic for reconstruction # self.transitions_mapping = {} # # def init_model(self, scaling: float = 1, **kwargs: Any) -> None: # # 1. Initialize Model (Maximize Reward) # self.model = dp.Model(maximize=True, float_cost=False) # # # 2. Preprocessing Data # # Scale distances to integers for DP efficiency # dist_matrix = (np.ceil(scaling * self.distance)).astype(int) # rewards = [int(c.reward) for c in self.problem.customers] # max_time = int(math.floor(scaling * self.problem.max_length_tours)) # # num_customers = self.problem.customer_count # num_vehicles = self.problem.vehicle_count # # # Identify customer nodes (excluding start/end depots) # # Note: We assume start/end indexes are depots and shouldn't be in the 'unvisited' set # # that we try to collect rewards from. # depots = set(self.problem.start_indexes + self.problem.end_indexes) # customer_indices = [i for i in range(num_customers) if i not in depots] # # # 3. Define State Variables # # Set of unvisited customers # node_obj = self.model.add_object_type(num_customers) # unvisited = self.model.add_set_var(object_type=node_obj, target=customer_indices) # # # Current location (node index) # current_node = self.model.add_element_var(object_type=node_obj, # target=self.problem.start_indexes[0]) # # # Current Vehicle Index # vehicle_obj = self.model.add_object_type(num_vehicles + 1) # +1 to represent "Done" state # current_vehicle = self.model.add_element_var(object_type=vehicle_obj, target=0) # # # Time consumed by current vehicle # current_time = self.model.add_int_resource_var(target=0, less_is_better=True) # # # 4. Define Tables for DIDPPY access # dist_table = self.model.add_int_table(dist_matrix) # reward_table = self.model.add_int_table(rewards) # # # Table for End Depots per vehicle (to check return feasibility) # # If all vehicles share the same end depot logic, this simplifies, # # but we support specific end depots per vehicle. # end_depots_list = self.problem.end_indexes # # If the number of end_indexes < num_vehicles (unlikely in this struct), handle it. # # We create a table: vehicle_index -> end_depot_node_index # end_depot_table = self.model.add_int_table(end_depots_list + [end_depots_list[-1]]) # Pad for safety # # start_depots_list = self.problem.start_indexes # start_depot_table = self.model.add_int_table(start_depots_list + [start_depots_list[-1]]) # # # 5. Transitions # # # A. Visit Customer Transition # # Try to visit every customer i from the current node # for i in customer_indices: # name = f"visit_{i}" # # # Cost to add to objective = Reward of customer i # cost_expr = reward_table[i] + dp.IntExpr.state_cost() # # # Distance from current node to i # dist_to_i = dist_table[current_node, i] # # Distance from i to the current vehicle's end depot # dist_return = dist_table[i, end_depot_table[current_vehicle]] # # # Preconditions: # # 1. Customer must be unvisited # # 2. We must have enough time to reach i AND return to the depot afterwards # preconditions = [ # unvisited.contains(i), # current_time + dist_to_i + dist_return <= max_time # ] # # # Effects: # effects = [ # (unvisited, unvisited.remove(i)), # (current_node, i), # (current_time, current_time + dist_to_i) # ] # # transition = dp.Transition( # name=name, # cost=cost_expr, # effects=effects, # preconditions=preconditions # ) # self.model.add_transition(transition) # self.transitions_mapping[name] = {"type": "visit", "node": i} # # # B. Next Vehicle / Finish Transition # # Move to the next vehicle (or finish if it's the last one) # # This represents "Returning to depot" and "Starting next vehicle" # next_veh_name = "next_vehicle" # # # Preconditions: # # Only allowed if we haven't exhausted all vehicles # preconditions_next = [current_vehicle < num_vehicles] # # # Effects: # # Increment vehicle index # # Reset time to 0 # # Set location to the start depot of the *next* vehicle # effects_next = [ # (current_vehicle, current_vehicle + 1), # (current_time, 0), # (current_node, start_depot_table[current_vehicle + 1]) # ] # # transition_next = dp.Transition( # name=next_veh_name, # cost=dp.IntExpr.state_cost(), # No reward gained for switching # effects=effects_next, # preconditions=preconditions_next # ) # self.model.add_transition(transition_next) # self.transitions_mapping[next_veh_name] = {"type": "next_vehicle"} # # # 6. Base Case # # The problem is "solved" (traversal complete) when we have used all vehicles. # # current_vehicle reaches num_vehicles # self.model.add_base_case([current_vehicle == num_vehicles]) # # # 7. Dual Bound (Heuristic) # # An upper bound on the remaining reward is the sum of rewards of all unvisited nodes. # # This ignores the time constraint but provides a valid admissible heuristic for pruning. # self.model.add_dual_bound(reward_table[unvisited]) # # def retrieve_solution(self, sol: dp.Solution) -> VrpSolution: # paths: List[List[int]] = [[] for _ in range(self.problem.vehicle_count)] # current_vehicle_idx = 0 # # for transition in sol.transitions: # t_info = self.transitions_mapping.get(transition.name) # if not t_info: # continue # # if t_info["type"] == "visit": # if current_vehicle_idx < self.problem.vehicle_count: # paths[current_vehicle_idx].append(t_info["node"]) # # elif t_info["type"] == "next_vehicle": # current_vehicle_idx += 1 # # return VrpSolution( # problem=self.problem, # list_paths=paths, # list_start_index=self.problem.start_indexes, # list_end_index=self.problem.end_indexes # )