Source code for discrete_optimization.tsp.solvers.optal

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
from __future__ import annotations

import logging
from enum import Enum
from typing import Any

import numpy as np

from discrete_optimization.generic_tasks_tools.base import Task
from discrete_optimization.generic_tasks_tools.solvers.optalcp_tasks_solver import (
    SchedulingOptalSolver,
)
from discrete_optimization.generic_tools.do_problem import ParamsObjectiveFunction
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    EnumHyperparameter,
)
from discrete_optimization.tsp.problem import Node, TspProblem, TspSolution
from discrete_optimization.tsp.utils import build_matrice_distance

try:
    import optalcp as cp
except ImportError:
    cp = None
    optalcp_available = False
else:
    optalcp_available = True


[docs] class ModelingTspEnum(Enum): V0 = "V0" V1 = "V1"
logger = logging.getLogger(__name__)
[docs] class OptalTspSolver(SchedulingOptalSolver[Node]): problem: TspProblem hyperparameters = [ EnumHyperparameter( name="modeling", enum=ModelingTspEnum, default=ModelingTspEnum.V0 ) ] def __init__( self, problem: TspProblem, params_objective_function: ParamsObjectiveFunction = None, **kwargs, ): super().__init__(problem, params_objective_function, **kwargs) self.distance_matrix = build_matrice_distance( self.problem.node_count, method=self.problem.evaluate_function_indexes, ) self.distance_matrix[self.problem.end_index, self.problem.start_index] = 0 self.modeling: ModelingTspEnum = None self.variables = {}
[docs] def init_model(self, **args: Any) -> None: args = self.complete_with_default_hyperparameters(args) self.modeling = args["modeling"] if self.modeling == ModelingTspEnum.V0: self.init_model_v0(**args) if self.modeling == ModelingTspEnum.V1: self.init_model_v1(**args)
[docs] def init_model_v0(self, **args: Any) -> None: """Model from gpd""" self.cp_model = cp.Model() upper_bound = int(sum(self.distance_matrix.max(axis=1))) visits = [ self.cp_model.interval_var( start=(0, None), end=(0, upper_bound), length=0, optional=False, name=f"visit_{i}", ) for i in range(self.problem.node_count) ] self.variables["visits"] = visits seq = self.cp_model.sequence_var(visits) self.cp_model.enforce( self.cp_model.start(visits[self.problem.start_index]) == 0 ) self.cp_model.no_overlap( seq, [ [ int(self.distance_matrix[i][j]) for j in range(self.problem.node_count) ] for i in range(self.problem.node_count) ], ) if self.problem.start_index == self.problem.end_index: come_back_base = self.cp_model.interval_var( start=(0, None), end=(0, upper_bound), length=0, optional=False, name=f"come_back_base", ) self.variables["come_back_base"] = come_back_base for i in range(self.problem.node_count): self.cp_model.end_before_start( visits[i], come_back_base, int(self.distance_matrix[i, self.problem.start_index]), ) self.cp_model.minimize(self.cp_model.end(come_back_base)) else: for i in range(self.problem.node_count): if i != self.problem.end_index: self.cp_model.end_before_start( visits[i], visits[self.problem.end_index] ) self.cp_model.minimize(self.cp_model.end(visits[self.problem.end_index]))
[docs] def init_model_v1(self, scaling: float = 100.0, **kwargs: Any): """Builds the OptalCP model for the TSP problem, model proposed by @thtran97""" self.cp_model = cp.Model() node_count = self.problem.node_count start_index = self.problem.start_index end_index = self.problem.end_index # Check if the distance matrix is already computed, otherwise compute it distance_matrix = ( (scaling * np.asarray(self.distance_matrix)).astype(int).tolist() ) # Create interval variables for each node (except start and end) visit_intervals = [ self.cp_model.interval_var(length=0, name=f"Visit_{i}") for i in range(node_count) ] tour_end = self.cp_model.interval_var(length=0, name="TourEnd") # Define the sequence of visits, excluding the start and end nodes nodes_in_sequence = [ i for i in range(node_count) if i != start_index and i != end_index ] sequence_intervals = [visit_intervals[i] for i in nodes_in_sequence] # Create a sequence variable that will enforce the order of visits sequence = self.cp_model.sequence_var(sequence_intervals, nodes_in_sequence) # Constraints to ensure the tour starts at start_index and ends at end_index self.cp_model.enforce(self.cp_model.start(visit_intervals[start_index]) == 0) self.cp_model.no_overlap(sequence, distance_matrix) # Add constraints to ensure the correct travel times between nodes based on the distance matrix for ni in nodes_in_sequence: # the visit of a node "ni" should be after the visit of the first node + delta time. self.cp_model.end_before_start( visit_intervals[start_index], visit_intervals[ni], distance_matrix[start_index][ni], ) # Specific additional constraint for the tour_end self.cp_model.end_before_start( visit_intervals[ni], tour_end, distance_matrix[ni][end_index] ) if not nodes_in_sequence and start_index != end_index: # case with only 2 nodes ??! self.cp_model.end_before_start( visit_intervals[start_index], tour_end, distance_matrix[start_index][end_index], ) # Objective: minimize the total tour length (end time of the tour) self.cp_model.minimize(tour_end.start()) self.variables["visits"] = visit_intervals
[docs] def get_task_interval_variable(self, task: Task) -> cp.IntervalVar: return self.variables["visits"][task]
[docs] def retrieve_solution(self, result: cp.SolveResult) -> TspSolution: logger.info(f"Current obj {result.solution.get_objective()}") starts = [ result.solution.get_start(self.get_task_interval_variable(i)) for i in range(self.problem.node_count) ] ordered = sorted(range(len(starts)), key=lambda i: starts[i]) solution = TspSolution( problem=self.problem, start_index=self.problem.start_index, end_index=self.problem.end_index, permutation=[ o for o in ordered if o not in {self.problem.start_index, self.problem.end_index} ], ) eval_ = self.problem.evaluate(solution) logger.info(f"{eval_}") return solution