Source code for discrete_optimization.top.solvers.optal

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import math
from typing import Any

import numpy as np

from discrete_optimization.generic_tools.do_problem import ParamsObjectiveFunction
from discrete_optimization.generic_tools.hub_solver.optal.optalcp_tools import (
    OptalCpSolver,
)

try:
    import optalcp as cp
except ImportError:
    cp = None
    optalcp_available = False
else:
    optalcp_available = True
from discrete_optimization.top.problem import TeamOrienteeringProblem, VrpSolution
from discrete_optimization.top.solvers import TopSolver


[docs] class OptalTopSolver(TopSolver, OptalCpSolver): def __init__( self, problem: TeamOrienteeringProblem, params_objective_function: ParamsObjectiveFunction | None = None, **kwargs, ) -> None: super().__init__(problem, params_objective_function, **kwargs) self.variables = {}
[docs] def init_model(self, scaling: float, **args: Any) -> None: distance = (np.ceil(scaling * self.distance)).astype(int).tolist() max_length = int(math.floor(scaling * self.problem.max_length_tours)) self.cp_model = cp.Model() intervals_leaving_per_vehicle = {} intervals_back_per_vehicle = {} interval_per_customer = {} intervals_per_vehicle = {} for v in range(self.problem.vehicle_count): intervals_per_vehicle[v] = {} st = self.problem.start_indexes[v] end = self.problem.end_indexes[v] for n in range(self.problem.customer_count): if n not in {st, end}: intervals_per_vehicle[v][n] = self.cp_model.interval_var( start=(0, max_length), end=(None, max_length), length=0, optional=True, name=f"visit_{n}_{v}", ) intervals_leaving_per_vehicle[v] = self.cp_model.interval_var( start=(0, max_length), end=(None, max_length), length=0, optional=True, name=f"leaving_{v}", ) intervals_back_per_vehicle[v] = self.cp_model.interval_var( start=(0, max_length), end=(None, max_length), length=0, optional=True, name=f"back_{v}", ) for n in intervals_per_vehicle[v]: self.cp_model.end_before_start( intervals_leaving_per_vehicle[v], intervals_per_vehicle[v][n] ) self.cp_model.end_before_start( intervals_per_vehicle[v][n], intervals_back_per_vehicle[v] ) self.cp_model.enforce( self.cp_model.presence(intervals_leaving_per_vehicle[v]) >= self.cp_model.presence(intervals_per_vehicle[v][n]) ) self.cp_model.enforce( self.cp_model.presence(intervals_leaving_per_vehicle[v]) == self.cp_model.presence(intervals_back_per_vehicle[v]) ) for v in range(self.problem.vehicle_count): seq = self.cp_model.sequence_var( [intervals_leaving_per_vehicle[v]] + [intervals_per_vehicle[v][n] for n in intervals_per_vehicle[v]] + [intervals_back_per_vehicle[v]], types=[self.problem.start_indexes[v]] + [n for n in intervals_per_vehicle[v]] + [self.problem.end_indexes[v]], ) self.cp_model.no_overlap(seq, distance) for n in range(self.problem.customer_count): if ( n not in self.problem.start_indexes and n not in self.problem.end_indexes ): interval_per_customer[n] = self.cp_model.interval_var( end=(0, max_length), length=0, optional=True, name=f"visit_{n}" ) self.cp_model.alternative( interval_per_customer[n], [ intervals_per_vehicle[v][n] for v in intervals_per_vehicle if n in intervals_per_vehicle[v] ], ) reward = self.cp_model.sum( [ self.cp_model.presence(interval_per_customer[n]) * int(self.problem.customers[n].reward) for n in interval_per_customer ] ) sum_end = self.cp_model.sum( [ self.cp_model.end(intervals_back_per_vehicle[v]) for v in intervals_back_per_vehicle ] ) self.variables["objs"] = {"reward": -reward, "sum_length": sum_end} self.cp_model.minimize(self.variables["objs"]["reward"]) self.variables["intervals"] = intervals_per_vehicle
[docs] def retrieve_solution(self, result: "cp.SolveResult") -> VrpSolution: # Same as the vrp one. paths = [] for v in range(self.problem.vehicle_count): path = [] for n in self.variables["intervals"][v]: if result.solution.is_present(self.variables["intervals"][v][n]): st, end = result.solution.get_value( self.variables["intervals"][v][n] ) path.append((st, end, n)) path = sorted(path, key=lambda x: x[0]) paths.append([p[2] for p in path]) sol = VrpSolution( problem=self.problem, list_start_index=self.problem.start_indexes, list_end_index=self.problem.end_indexes, list_paths=paths, ) return sol