Source code for discrete_optimization.top.solvers.ortools
# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
import logging
from typing import Any, Optional
import numpy as np
from ortools.constraint_solver import (
pywrapcp,
routing_parameters_pb2,
)
from ortools.util.optional_boolean_pb2 import BOOL_FALSE, BOOL_TRUE
from discrete_optimization.generic_tools.callbacks.callback import (
Callback,
CallbackList,
)
from discrete_optimization.generic_tools.do_problem import ParamsObjectiveFunction
from discrete_optimization.generic_tools.do_solver import ResultStorage, SolverDO
from discrete_optimization.generic_tools.exceptions import SolveEarlyStop
# Attempt to import Enums from the existing GPDP ortools_routing module
# to maintain consistency as requested.
from discrete_optimization.gpdp.solvers.ortools_routing import (
FirstSolutionStrategy,
LocalSearchMetaheuristic,
)
from discrete_optimization.top.problem import TeamOrienteeringProblem
from discrete_optimization.vrp.problem import VrpSolution
from discrete_optimization.vrp.utils import compute_length_matrix
logger = logging.getLogger(__name__)
[docs]
class OrtoolsTopSolver(SolverDO):
problem: TeamOrienteeringProblem
manager: Optional[pywrapcp.RoutingIndexManager] = None
routing: Optional[pywrapcp.RoutingModel] = None
def __init__(
self,
problem: TeamOrienteeringProblem,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
**kwargs: Any,
):
super().__init__(problem, params_objective_function, **kwargs)
# Precompute distance matrix (float)
_, self.distance = compute_length_matrix(self.problem)
self.search_parameters = None
[docs]
def init_model(self, scaling: float = 1, **kwargs: Any) -> None:
num_locations = len(self.problem.customers)
num_vehicles = self.problem.vehicle_count
self.manager = pywrapcp.RoutingIndexManager(
num_locations,
num_vehicles,
self.problem.start_indexes,
self.problem.end_indexes,
)
self.routing = pywrapcp.RoutingModel(self.manager)
# Pre-scale distance matrix to integers
matrix_distance_int = np.array(scaling * self.distance, dtype=np.int64)
def distance_callback(from_index: int, to_index: int) -> int:
from_node = self.manager.IndexToNode(from_index)
to_node = self.manager.IndexToNode(to_index)
return matrix_distance_int[from_node, to_node]
transit_callback_index = self.routing.RegisterTransitCallback(distance_callback)
self.routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
# 5. Add Distance Dimension (Max Length Constraint)
dimension_name = "Distance"
self.routing.AddDimension(
transit_callback_index,
0, # no slack
int(self.problem.max_length_tours * scaling), # vehicle max travel distance
True, # start cumul to zero
dimension_name,
)
# 6. Define Rewards (Disjunctions) for TOP
# In TOP, nodes can be skipped. We use disjunctions.
# Penalty should be high enough to encourage visiting.
penalty_scaler = kwargs.get("penalty_scaler", 1000)
for node_idx in range(num_locations):
# Skip Start/End depots
if (
node_idx in self.problem.start_indexes
or node_idx in self.problem.end_indexes
):
continue
reward = self.problem.customers[node_idx].reward
if reward > 0:
# We penalize skipping a node based on its reward.
penalty = int(reward * scaling * penalty_scaler)
self.routing.AddDisjunction(
[self.manager.NodeToIndex(node_idx)], penalty
)
# 7. Build Search Parameters
self.search_parameters = self.build_search_parameters(**kwargs)
logger.info("Initialized OR-Tools model for TOP.")
[docs]
def build_search_parameters(
self, **kwargs: Any
) -> routing_parameters_pb2.RoutingSearchParameters:
# Retrieve strategies from kwargs or defaults
first_solution_strategy = kwargs.get(
"first_solution_strategy", FirstSolutionStrategy.PATH_CHEAPEST_ARC
)
local_search_metaheuristic = kwargs.get(
"local_search_metaheuristic", LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
)
time_limit = kwargs.get("time_limit", 30)
use_lns = kwargs.get("use_lns", True)
use_cp = kwargs.get("use_cp", True)
use_cp_sat = kwargs.get("use_cp_sat", False)
verbose = kwargs.get("verbose", False)
# Convert Enums to integers if necessary
if hasattr(first_solution_strategy, "value"):
first_solution_strategy = first_solution_strategy.value
if hasattr(local_search_metaheuristic, "value"):
local_search_metaheuristic = local_search_metaheuristic.value
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
search_parameters.log_search = verbose
search_parameters.first_solution_strategy = first_solution_strategy
search_parameters.local_search_metaheuristic = local_search_metaheuristic
search_parameters.time_limit.seconds = time_limit
# Configure LNS and CP settings
if use_lns:
search_parameters.local_search_operators.use_path_lns = BOOL_TRUE
search_parameters.local_search_operators.use_inactive_lns = BOOL_TRUE
search_parameters.use_cp = BOOL_TRUE if use_cp else BOOL_FALSE
search_parameters.use_cp_sat = BOOL_TRUE if use_cp_sat else BOOL_FALSE
return search_parameters
[docs]
def solve(
self,
callbacks: Optional[list[Callback]] = None,
time_limit: int = 100,
**kwargs: Any,
) -> ResultStorage:
if self.manager is None:
self.init_model(**kwargs)
self.search_parameters.time_limit.seconds = time_limit
# 1. Setup Callback/Monitor
callbacks_list = CallbackList(callbacks=callbacks)
monitor = TopRoutingMonitor(self, callback=callbacks_list)
self.routing.AddSearchMonitor(monitor)
# 2. Solve
try:
self.routing.SolveWithParameters(self.search_parameters)
except SolveEarlyStop as e:
logger.info(e)
return monitor.res
[docs]
class TopRoutingMonitor(pywrapcp.SearchMonitor):
"""
Monitor to retrieve intermediate solutions and handle callbacks
(logging, early stopping) for the Team Orienteering Problem.
"""
def __init__(self, do_solver: OrtoolsTopSolver, callback: Callback):
super().__init__(do_solver.routing.solver())
self.do_solver = do_solver
self.model = do_solver.routing
self.callback = callback
self.res = do_solver.create_result_storage()
self.nb_solutions = 0
self._best_objective = float("inf")
[docs]
def AtSolution(self) -> bool:
"""Called by OR-Tools whenever a valid solution is found."""
current_obj = self.model.CostVar().Max()
logger.info(f"Solution found: Objective {current_obj}")
# Update best objective tracking
if current_obj < self._best_objective:
self._best_objective = current_obj
self.retrieve_current_solution()
else:
# Optional: Retrieve periodically even if not strictly better
# to capture diversity or if using different criteria
if self.nb_solutions % 100 == 0:
self.retrieve_current_solution()
self.nb_solutions += 1
# Check user defined callbacks (e.g. timeout, user interruption)
stopping = self.callback.on_step_end(
step=self.nb_solutions, res=self.res, solver=self.do_solver
)
if stopping:
raise SolveEarlyStop("Solver stopped by user callback.")
return not stopping
[docs]
def retrieve_current_solution(self) -> None:
"""Converts the current OR-Tools assignment to a VrpSolution."""
vehicle_count = self.do_solver.problem.vehicle_count
vehicle_tours: list[list[int]] = []
for vehicle_id in range(vehicle_count):
path = []
index = self.model.Start(vehicle_id)
# Note: VrpSolution expects the path strictly BETWEEN start and end.
# We skip the start node for the list, but we must traverse it.
# Move to next
index = self.model.NextVar(index).Value()
while not self.model.IsEnd(index):
node_index = self.do_solver.manager.IndexToNode(index)
path.append(node_index)
index = self.model.NextVar(index).Value()
vehicle_tours.append(path)
# Create VrpSolution (which is compatible with TOP evaluation)
# Note: TOP problem usually defines start/end indexes in the problem class
variable_vrp = VrpSolution(
problem=self.do_solver.problem,
list_start_index=self.do_solver.problem.start_indexes,
list_end_index=self.do_solver.problem.end_indexes,
list_paths=vehicle_tours,
length=None,
lengths=None,
capacities=None,
)
# Evaluate to get the fitness (fitness is usually MINIMIZATION in DO)
# For TOP, fitness might be -reward or similar aggregation
fit = self.do_solver.aggreg_from_sol(variable_vrp)
self.res.append((variable_vrp, fit))