Source code for discrete_optimization.vrptw.solvers.optal

#  Copyright (c) 2025 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
from __future__ import annotations

import logging
from typing import Any, Dict, Optional

from discrete_optimization.generic_tasks_tools.solvers.optalcp_tasks_solver import (
    SchedulingOptalSolver,
)
from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Solution,
)
from discrete_optimization.vrptw.problem import (
    Task,
    VRPTWProblem,
    VRPTWSolution,
)

try:
    import optalcp as cp
except ImportError:
    cp = None
    optalcp_available = False
else:
    optalcp_available = True


logger = logging.getLogger(__name__)


[docs] class OptalVRPTWSolver(SchedulingOptalSolver[Task]): """ Optal solver for the Vehicle Routing Problem with Time Windows (VRPTW). """
[docs] def get_task_interval_variable(self, task: Task) -> cp.IntervalVar: return self.variables["intervals_per_node"][task]
problem: VRPTWProblem def __init__( self, problem: VRPTWProblem, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs, ): super().__init__( problem=problem, params_objective_function=params_objective_function ) self.variables: Dict[str, Any] = {} self.scaling_factor = None # Scale distances and times to use integers self.distance_matrix = self.problem.distance_matrix
[docs] def init_model(self, scaling: int, cost_per_vehicle: int, **kwargs: Any) -> None: """Initialise the CP-SAT model.""" self.cp_model = cp.Model() self.scaling_factor = scaling self.distance_matrix = [ [ int(self.scaling_factor * self.distance_matrix[i, j]) for j in range(self.distance_matrix.shape[1]) ] for i in range(self.distance_matrix.shape[0]) ] k = self.problem.nb_vehicles depot = self.problem.depot_node intervals_per_vehicle = {} intervals_distance_per_vehicle = {} intervals_distance_come_back = {} intervals_come_back = {} intervals_per_node = {} used_vehicle = {} for vehicle in range(k): intervals_per_vehicle[vehicle] = {} intervals_distance_per_vehicle[vehicle] = {} for node in self.problem.customers: lb = max( int(self.scaling_factor * self.problem.time_windows[node][0]), self.distance_matrix[depot][node], ) ub = max( int(self.scaling_factor * self.problem.time_windows[node][1]), self.distance_matrix[depot][node], ) service_time = int( self.scaling_factor * self.problem.service_times[node] ) intervals_per_vehicle[vehicle][node] = self.cp_model.interval_var( start=(lb, ub), end=(lb + service_time, ub + service_time), length=service_time, optional=True, name=f"visit_{node}_{vehicle}", ) intervals_distance_per_vehicle[vehicle][node] = ( self.cp_model.interval_var( start=(self.distance_matrix[depot][node], None), end=(self.distance_matrix[depot][node], None), length=0, optional=True, name=f"visitd_{node}_{vehicle}", ) ) self.cp_model.enforce( self.cp_model.presence(intervals_per_vehicle[vehicle][node]) == self.cp_model.presence( intervals_distance_per_vehicle[vehicle][node] ) ) lb = int(self.scaling_factor * self.problem.time_windows[depot][0]) ub = int(self.scaling_factor * self.problem.time_windows[depot][1]) intervals_come_back[vehicle] = self.cp_model.interval_var( start=(lb, ub), end=(lb, ub), length=0, optional=True ) intervals_distance_come_back[vehicle] = self.cp_model.interval_var( start=(0, None), end=(0, None), length=0, optional=True ) used_vehicle[vehicle] = self.cp_model.max( [ self.cp_model.presence(intervals_per_vehicle[vehicle][node]) for node in intervals_per_vehicle[vehicle] ] ) self.cp_model.enforce( self.cp_model.presence(intervals_come_back[vehicle]) == used_vehicle[vehicle] ) self.cp_model.enforce( self.cp_model.presence(intervals_distance_come_back[vehicle]) == used_vehicle[vehicle] ) for node in intervals_per_vehicle[vehicle]: self.cp_model.end_before_start( intervals_per_vehicle[vehicle][node], intervals_come_back[vehicle] ) self.cp_model.end_before_start( intervals_distance_per_vehicle[vehicle][node], intervals_distance_come_back[vehicle], ) for node in self.problem.customers: lb = int(self.scaling_factor * self.problem.time_windows[node][0]) ub = int(self.scaling_factor * self.problem.time_windows[node][1]) service_time = int(self.scaling_factor * self.problem.service_times[node]) intervals_per_node[node] = self.cp_model.interval_var( start=(lb, ub), end=(lb, ub), length=service_time, optional=False, name=f"visit_{node}", ) self.cp_model.alternative( intervals_per_node[node], [intervals_per_vehicle[v][node] for v in intervals_per_vehicle], ) for v in range(self.problem.nb_vehicles): seq = self.cp_model.sequence_var( [ self.cp_model.interval_var( start=0, end=0, length=0, optional=False, name=f"start_{v}" ) ] + [intervals_per_vehicle[v][node] for node in self.problem.customers] + [intervals_come_back[v]], types=[depot] + self.problem.customers + [depot], ) seq_2 = self.cp_model.sequence_var( [ self.cp_model.interval_var( start=0, end=0, length=0, optional=False, name=f"start_dist_{v}" ) ] + [ intervals_distance_per_vehicle[v][node] for node in self.problem.customers ] + [intervals_distance_come_back[v]], types=[depot] + self.problem.customers + [depot], ) self.cp_model._same_sequence(seq, seq_2) self.cp_model.no_overlap(seq, self.distance_matrix) self.cp_model.no_overlap(seq_2, self.distance_matrix) loads = self.cp_model.sum( [ int(self.problem.demands[n]) * self.cp_model.presence(intervals_per_vehicle[v][n]) for n in self.problem.customers ] ) self.cp_model.enforce(loads <= int(self.problem.vehicle_capacity)) self.variables["nb_vehicles"] = self.cp_model.sum( [used_vehicle[v] for v in used_vehicle] ) self.variables["total_distance"] = self.cp_model.sum( [ self.cp_model.guard( self.cp_model.end(intervals_distance_come_back[v]), 0 ) for v in used_vehicle ] ) self.variables["intervals_per_vehicle"] = intervals_per_vehicle self.variables["intervals_per_node"] = intervals_per_node self.cp_model.minimize( self.variables["nb_vehicles"] * cost_per_vehicle + self.variables["total_distance"] )
[docs] def retrieve_solution(self, result: cp.SolveResult) -> Solution: routes = [] for v in range(self.problem.nb_vehicles): all_intervals = [] for n in self.problem.customers: if result.solution.is_present( self.variables["intervals_per_vehicle"][v][n] ): st, end = result.solution.get_value( self.variables["intervals_per_vehicle"][v][n] ) all_intervals.append((st, end, n)) sorted_intervals = sorted(all_intervals, key=lambda x: x[0]) routes.append([x[2] for x in sorted_intervals]) # cumul_time = 0 # current_node = self.problem.depot_node # for n in routes[-1]: # cumul_time += self.distance_matrix[current_node][n] # current_node = n sol = VRPTWSolution(problem=self.problem, routes=routes) return sol