Source code for discrete_optimization.vrptw.solvers.cpsat

#  Copyright (c) 2025 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import logging
from typing import Any, Dict, List, Optional

from ortools.sat.python.cp_model import CpSolverSolutionCallback, LinearExprT

from discrete_optimization.generic_tasks_tools.enums import StartOrEnd
from discrete_optimization.generic_tasks_tools.solvers.cpsat import (
    AllocationCpSatSolver,
    SchedulingCpSatSolver,
)
from discrete_optimization.generic_tools.do_problem import ParamsObjectiveFunction
from discrete_optimization.generic_tools.do_solver import StatusSolver, WarmstartMixin
from discrete_optimization.generic_tools.ortools_cpsat_tools import OrtoolsCpSatSolver
from discrete_optimization.vrptw.problem import (
    Task,
    UnaryResource,
    VRPTWProblem,
    VRPTWSolution,
)

logger = logging.getLogger(__name__)


[docs] class CpSatVRPTWSolver(SchedulingCpSatSolver[Task], WarmstartMixin): """ CP-SAT solver for the Vehicle Routing Problem with Time Windows (VRPTW). This solver uses a path-based formulation with time and load dimensions to find an optimal solution. It aims to minimize the number of vehicles used, and then the total distance. Attributes: problem (VRPTWProblem): The VRPTW problem instance. variables (Dict[str, Any]): Stores CP-SAT model variables. """
[docs] def get_task_start_or_end_variable( self, task: Task, start_or_end: StartOrEnd ) -> LinearExprT: # if start_or_end == StartOrEnd.START: return self.variables["t_time"][task]
# if start_or_end == StartOrEnd.END: # return self.variables["t_time"][task]+int(self.scaling_factor*self.problem.service_times[task]) # def get_task_unary_resource_is_present_variable(self, task: Task, unary_resource: UnaryResource) -> LinearExprT: # This is not present in the model unfortunately. # pass problem: VRPTWProblem def __init__( self, problem: VRPTWProblem, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs, ): super().__init__( problem=problem, params_objective_function=params_objective_function ) self.variables: Dict[str, Any] = {} self.scaling_factor = None # Scale distances and times to use integers
[docs] def init_model(self, scaling: int, cost_per_vehicle: int, **kwargs: Any) -> None: """Initialise the CP-SAT model.""" super().init_model(**kwargs) self.scaling_factor = scaling if self.cp_model is None: raise RuntimeError( "self.cp_model must not be None after super().init_model()." ) n = self.problem.nb_nodes k = self.problem.nb_vehicles depot = self.problem.depot_node # --- Create variables --- # x_arc[i, j]: BoolVar, true if arc (i, j) is used by any vehicle x_arc = { (i, j): self.cp_model.NewBoolVar(f"x_{i},{j}") for i in range(n) for j in range(n) if i != j } # Time dimension variables: t_time[i] is the time service *starts* at node i # We must scale times to use integers t_time = [ self.cp_model.NewIntVar( lb=int(self.scaling_factor * self.problem.time_windows[i][0]), ub=int(self.scaling_factor * self.problem.time_windows[i][1]), name=f"t_{i}", ) for i in range(n) ] # Load dimension variables: load[i] is the cumulative load *after* service at node i load = [ self.cp_model.NewIntVar( lb=int(self.problem.demands[i]), ub=int(self.problem.vehicle_capacity), name=f"load_{i}", ) for i in range(n) ] self.variables = { "x_arc": x_arc, "t_time": t_time, "load": load, } # --- Add constraints --- # 1. Degree constraints # Each customer is visited exactly once for i in self.problem.customers: # One successor self.cp_model.Add(sum(x_arc[i, j] for j in range(n) if i != j) == 1) # One predecessor self.cp_model.Add(sum(x_arc[j, i] for j in range(n) if i != j) == 1) # 2. Depot constraints # Number of routes leaving depot <= k self.cp_model.Add(sum(x_arc[depot, j] for j in self.problem.customers) <= k) # Number of routes returning to depot <= k self.cp_model.Add(sum(x_arc[i, depot] for i in self.problem.customers) <= k) # Same number of routes leave and return self.cp_model.Add( sum(x_arc[depot, j] for j in self.problem.customers) == sum(x_arc[i, depot] for i in self.problem.customers) ) # Link to vehicle_used variables nb_vehicles = self.cp_model.NewIntVar(0, k, "nb_vehicles") self.cp_model.Add( sum(x_arc[depot, j] for j in self.problem.customers) == nb_vehicles ) # No arc from depot to depot # self.cp_model.Add(x_arc[depot, depot] == 0) # 3. Time dimension constraints self.cp_model.Add( t_time[depot] == int(self.scaling_factor * self.problem.time_windows[depot][0]) ) t_time_return_depot = self.cp_model.NewIntVar( lb=int(self.scaling_factor * self.problem.time_windows[depot][0]), ub=int(self.scaling_factor * self.problem.time_windows[depot][1]), name="time_return_depot", ) self.variables["time_return_depot"] = t_time_return_depot for i in range(n): for j in range(n): if i == j: continue if j == depot: var_ = t_time_return_depot else: var_ = t_time[j] # Scaled travel time and service time travel_time = int( self.scaling_factor * self.problem.distance_matrix[i, j] ) service_time = int(self.scaling_factor * self.problem.service_times[i]) # If arc (i,j) is taken, time at j must be after time at i + service + travel self.cp_model.Add( var_ >= t_time[i] + service_time + travel_time ).OnlyEnforceIf(x_arc[i, j]) # 4. Load dimension constraints self.cp_model.Add(load[depot] == 0) for i in range(n): for j in self.problem.customers: # Customers only if i == j: continue demand_j = int(self.problem.demands[j]) # If arc (i,j) is taken, load at j = load at i + demand at j # (load[i] is load *after* service at i) self.cp_model.Add(load[j] == load[i] + demand_j).OnlyEnforceIf( x_arc[i, j] ) # Special case: arc from depot (i=depot) if i == depot: self.cp_model.Add(load[j] == demand_j).OnlyEnforceIf( x_arc[depot, j] ) scaled_distances = { (i, j): int(self.scaling_factor * self.problem.distance_matrix[i, j]) for i in range(n) for j in range(n) } # Total distance total_distance = self.cp_model.NewIntVar(0, 10**10, "total_distance") self.cp_model.Add( total_distance == sum(x_arc[i, j] * scaled_distances[i, j] for i, j in x_arc) ) # Number of vehicles self.variables["nb_vehicles"] = nb_vehicles self.variables["total_distance"] = total_distance # Lexicographic objective: 1. Min vehicles, 2. Min distance # We use a large constant to separate the objectives self.cp_model.Minimize(nb_vehicles * cost_per_vehicle + total_distance) logger.info("CP-SAT model initialized.")
[docs] def retrieve_solution(self, cpsolvercb: CpSolverSolutionCallback) -> VRPTWSolution: """ Build a VRPTWSolution from the CP-SAT solver's callback. """ x_arc = self.variables["x_arc"] logger.info(f"Obj : {cpsolvercb.objective_value}") # for i,j in x_arc: # val = cpsolvercb.Value(x_arc[(i, j)]) # if val == 1: # print("Flow ", (i, j)) depot = self.problem.depot_node n = self.problem.nb_nodes routes = [] # Find all successors of the depot starters = [] for j in self.problem.customers: if cpsolvercb.Value(x_arc[depot, j]): starters.append(j) visited = set() # Reconstruct each route for start_node in starters: if start_node in visited: continue route = [start_node] visited.add(start_node) current_node = start_node while True: # Find the successor successor_found = False for j in range(n): if current_node == j: continue # print("Value :", cpsolvercb.Value(x_arc[current_node, j]), j) if (current_node, j) in x_arc and cpsolvercb.Value( x_arc[current_node, j] ): if j == depot: # Route finished current_node = depot successor_found = True break else: # Next customer route.append(j) visited.add(j) current_node = j successor_found = True break # print(route, current_node, depot) if not successor_found or current_node == depot: break # Should not happen unless route ends at depot routes.append(route) sol = VRPTWSolution( problem=self.problem, routes=routes, scaling=self.scaling_factor ) sol._times = [ cpsolvercb.Value(self.variables["t_time"][i]) for i in range(len(self.variables["t_time"])) ] sol._time_return = cpsolvercb.Value(self.variables["time_return_depot"]) sol._total_distance = cpsolvercb.Value(self.variables["total_distance"]) return sol
[docs] def set_warm_start(self, solution: VRPTWSolution) -> None: """ Provides a warm start hint to the CP-SAT solver from an existing solution. """ if self.cp_model is None: raise RuntimeError( "self.cp_model must not be None after self.init_model()." ) self.cp_model.ClearHints() logger.info("Setting warm start from solution.") x_arc = self.variables["x_arc"] load_var = self.variables["load"] self.cp_model.AddHint(load_var[self.problem.depot_node], 0) depot = self.problem.depot_node # Hint arc variables arcs = {key: 0 for key in x_arc} total_distance = 0 nb_vehicle = 0 for route in solution.routes: if not route: continue nb_vehicle += 1 arcs[(depot, route[0])] = 1 # Arcs between customers cumul_load = int(self.problem.demands[route[0]]) total_distance += int( self.scaling_factor * self.problem.distance_matrix[depot, route[0]] ) self.cp_model.add_hint(load_var[route[0]], cumul_load) for i in range(len(route) - 1): u, v = route[i], route[i + 1] cumul_load += int(self.problem.demands[v]) total_distance += int( self.scaling_factor * self.problem.distance_matrix[u, v] ) self.cp_model.add_hint(load_var[v], cumul_load) arcs[(u, v)] = 1 total_distance += int( self.scaling_factor * self.problem.distance_matrix[route[-1], depot] ) # Arc back to depot arcs[(route[-1], depot)] = 1 for key in arcs: self.cp_model.AddHint(x_arc[key], arcs[key]) self.cp_model.add_hint( self.variables["time_return_depot"], solution._time_return ) self.cp_model.add_hint(self.variables["nb_vehicles"], nb_vehicle) print(solution._total_distance, " vs ", total_distance) self.cp_model.add_hint( self.variables["total_distance"], solution._total_distance ) if getattr(solution, "_times", None) is not None: ts = getattr(solution, "_times") t_time = self.variables["t_time"] for i in range(len(ts)): self.cp_model.AddHint(t_time[i], ts[i]) elif solution.start_service_times: t_time = self.variables["t_time"] for v_idx, route in enumerate(solution.routes): starts = solution.start_service_times[v_idx] for cust_idx, node_id in enumerate(route): self.cp_model.AddHint( t_time[node_id], int(self.scaling_factor * starts[cust_idx]) )