# Copyright (c) 2025 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import re
from typing import Any, List
import didppy as dp
from discrete_optimization.generic_tools.dyn_prog_tools import DpSolver
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
CategoricalHyperparameter,
)
from discrete_optimization.vrptw.problem import VRPTWProblem, VRPTWSolution
logger = logging.getLogger(__name__)
[docs]
class DpVrptwSolver(DpSolver):
"""
Dynamic Programming solver for the VRPTW.
This model combines the state variables from the VRP (load, vehicle index)
and the TSPTW (time) to solve the VRPTW.
State variables:
- unvisited (Set): Set of customers not yet served.
- location (Element): The current location of the active vehicle.
- load (FloatResource): The current load of the active vehicle.
- time (FloatResource): The current time (service start time) at the current location.
- vehicles_ (Element): The index of the active vehicle (from 0 to m-1).
"""
hyperparameters = [
CategoricalHyperparameter(
name="resource_var_load", choices=[True, False], default=False
),
CategoricalHyperparameter(
name="resource_var_time", choices=[True, False], default=False
),
]
problem: VRPTWProblem
transitions: dict
scaling: int
[docs]
def init_model(
self, scaling: int = 1000, cost_per_vehicle: int = 10**6, **kwargs: Any
) -> None:
"""
DP model for VRPTW.
"""
kwargs = self.complete_with_default_hyperparameters(kwargs)
self.scaling = scaling
n = self.problem.nb_nodes
m = self.problem.nb_vehicles
depot = self.problem.depot_node
vehicle_capacity = self.problem.vehicle_capacity
# We need a large cost to penalize using a new vehicle
# This should be larger than the max possible total distance
cost_per_vehicle = cost_per_vehicle
model = dp.Model(float_cost=False)
# --- Object Types ---
customer = model.add_object_type(number=n)
vehicle_obj = model.add_object_type(number=m)
# --- Data Tables ---
demand = model.add_int_table([int(d) for d in self.problem.demands])
distance = model.add_int_table(
(scaling * self.problem.distance_matrix).astype(int)
)
service = model.add_int_table(
[int(scaling * x) for x in self.problem.service_times]
)
release = model.add_int_table(
[int(scaling * x[0]) for x in self.problem.time_windows]
)
deadline = model.add_int_table(
[int(scaling * x[1]) for x in self.problem.time_windows]
)
# --- State Variables ---
unvisited = model.add_set_var(
object_type=customer, target=self.problem.customers
)
location = model.add_element_var(object_type=customer, target=depot)
if kwargs["resource_var_load"]:
load = model.add_int_resource_var(target=0, less_is_better=False)
else:
load = model.add_int_var(target=0)
if kwargs["resource_var_time"]:
time = model.add_int_resource_var(
target=int(scaling * self.problem.time_windows[depot][0]),
less_is_better=True,
)
else:
time = model.add_int_var(
target=int(scaling * self.problem.time_windows[depot][0]),
)
vehicles_ = model.add_element_var(object_type=vehicle_obj, target=0)
self.transitions = {}
# --- Transitions ---
for j in self.problem.customers:
# --- Transition 1: Visit customer j with the *current* vehicle ---
# Time logic:
# arrival_j = time_at_i + service_at_i + travel_i_j
# start_j = max(arrival_j, release_j)
start_j = dp.max(
time + service[location] + distance[location, j], release[j]
)
visit_j = dp.Transition(
name=f"visit {j}",
cost=distance[location, j] + dp.IntExpr.state_cost(),
effects=[
(unvisited, unvisited.remove(j)),
(location, j),
(load, load + demand[j]),
(time, start_j),
],
preconditions=[
unvisited.contains(j),
load + demand[j] <= vehicle_capacity,
start_j <= deadline[j],
],
)
model.add_transition(visit_j)
self.transitions[("visit", "current", j)] = visit_j
# --- Transition 2: Visit customer j with a *new* vehicle ---
# Time logic for a new vehicle starting from depot:
# arrival_j = time_at_depot + service_at_depot + travel_depot_j
# (service_at_depot is 0)
start_j_new_vehicle = dp.max(
release[depot] + distance[depot, j], release[j]
)
visit_j_new_vehicle = dp.Transition(
name=f"visit {j} with a new vehicle",
cost=(
distance[location, depot] # Cost to return to depot
+ distance[depot, j] # Cost to go to j
+ cost_per_vehicle # Penalty for new vehicle
+ dp.IntExpr.state_cost()
),
effects=[
(unvisited, unvisited.remove(j)),
(location, j),
(load, demand[j]), # Reset load for new vehicle
(time, start_j_new_vehicle), # Reset time for new vehicle
(vehicles_, vehicles_ + 1), # Increment vehicle index
],
preconditions=[
unvisited.contains(j),
vehicles_ < m - 1, # Must have a new vehicle available
demand[j] <= vehicle_capacity,
start_j_new_vehicle <= deadline[j],
],
)
model.add_transition(visit_j_new_vehicle)
self.transitions[("visit", "next_vehicle", j)] = visit_j_new_vehicle
# --- Transition 3: Return to depot (end of all routes) ---
# Time logic:
# arrival_at_depot = time_at_last_cust + service_at_last_cust + travel_last_cust_depot
arrival_depot = time + service[location] + distance[location, depot]
return_to_depot = dp.Transition(
name="return",
cost=distance[location, depot] + cost_per_vehicle + dp.IntExpr.state_cost(),
# Add the initial vehicle.
effects=[(location, depot), (time, arrival_depot)],
preconditions=[
unvisited.is_empty(),
location != depot,
# arrival_depot <= deadline[depot]
# Check final depot TW
],
)
model.add_transition(return_to_depot)
self.transitions["return"] = return_to_depot
# --- Base Case ---
model.add_base_case([unvisited.is_empty(), location == depot])
# --- State Constraints (Pruning) ---
# 1. Capacity Pruning:
# Total remaining demand must be <= capacity left on this vehicle
# + capacity of all remaining unused vehicles.
# remaining_capacity_current_vehicle = vehicle_capacity - load
# (m - 1) is max index, vehicles_ is current. (m - 1 - vehicles_) is remaining.
# remaining_capacity_other_vehicles = (m - 1 - vehicles_) * vehicle_capacity
# total_remaining_capacity = (
# remaining_capacity_current_vehicle + remaining_capacity_other_vehicles
# )
# model.add_state_constr(total_remaining_capacity >= demand[unvisited])
# 2. Time Window Pruning:
# For every unvisited customer j, it must still be possible
# to visit them from the current location.
# if False:
# for j in self.problem.customers:
# model.add_state_constr(
# ~unvisited.contains(j)
# | (
# dp.max(time + service[location] + distance[location, j], release[j])
# <= deadline[j]
# )
# )
# --- Dual Bounds (Heuristics) ---
needed_vehicle = model.add_int_state_fun(
demand[unvisited] // self.problem.vehicle_capacity + 1
)
# 1. Min distance to all unvisited nodes
min_distance_to = model.add_int_table(
[
min(
int(scaling * self.problem.distance_matrix[k, j])
for k in range(n)
if k != j
)
if j != depot
else 0
for j in range(n)
]
)
model.add_dual_bound(
needed_vehicle * cost_per_vehicle
+ min_distance_to[unvisited]
+ (location != depot).if_then_else(min_distance_to[depot], 0)
)
# 2. Min distance from all unvisited nodes
min_distance_from = model.add_int_table(
[
min(
int(scaling * self.problem.distance_matrix[j, k])
for k in range(n)
if k != j
)
if j != depot
else 0
for j in range(n)
]
)
model.add_dual_bound(
needed_vehicle * vehicle_capacity
+ min_distance_from[unvisited]
+ (location != depot).if_then_else(min_distance_from[location], 0)
)
self.variables = {"time": time}
self.model = model
[docs]
def retrieve_solution(self, sol: dp.Solution) -> VRPTWSolution:
"""
Reconstructs the VRPTWSolution from the sequence of DP transitions.
This logic is adapted from DpVrpSolver.
"""
list_paths: List[List[int]] = [[] for _ in range(self.problem.nb_vehicles)]
def extract_visit_number(text):
match = re.search(r"visit\s(\d+)", text, re.IGNORECASE)
if match:
return int(match.group(1))
return None
cur_vehicle_index = 0
for t in sol.transitions:
name = t.name
if "with a new vehicle" in name:
cur_vehicle_index += 1
if "return" in name:
break
customer_node = extract_visit_number(name)
if customer_node is not None:
list_paths[cur_vehicle_index].append(customer_node)
logger.info(f"DP Solution Cost (includes vehicle penalties): {sol.cost}")
# Filter out empty routes
final_routes = [route for route in list_paths if route]
return VRPTWSolution(
problem=self.problem,
routes=final_routes,
)
[docs]
def set_warm_start(self, solution: VRPTWSolution) -> None:
"""
Provides a warm start hint to the DP solver from an existing solution.
Converts a VRPTWSolution into a list of didppy.Transition objects.
"""
if self.model is None:
self.init_model() # Ensure transitions are populated
initial_solution = []
# Filter out any potential empty routes from the warmstart solution
non_empty_routes = [route for route in solution.routes if route]
state = self.model.target_state
for route_idx, route in enumerate(non_empty_routes):
is_first_customer_of_route = True
prev_customer = None
for customer_node in route:
if is_first_customer_of_route:
is_first_customer_of_route = False
if route_idx == 0:
# First customer of the first route: use "current" vehicle
# (The DP state starts with vehicle 0)
transition_key = ("visit", "current", customer_node)
else:
# First customer of a new route: use "next_vehicle"
transition_key = ("visit", "next_vehicle", customer_node)
else:
# Subsequent customers in a route: use "current" vehicle
transition_key = ("visit", "current", customer_node)
if transition_key in self.transitions:
initial_solution.append(self.transitions[transition_key])
applicable = initial_solution[-1].is_applicable(state, self.model)
if not applicable:
print("Time, : ", state[self.variables["time"]])
if prev_customer is not None:
print(
state[self.variables["time"]]
+ int(
self.scaling
* self.problem.service_times[prev_customer]
)
+ int(
self.scaling
* self.problem.distance_matrix[
prev_customer, customer_node
]
)
)
print("vs ", self.problem.time_windows[customer_node])
print(transition_key, "not feasible")
state = initial_solution[-1].apply(state, self.model)
else:
logger.warning(
f"Warmstart: Transition {transition_key} not found. Skipping."
)
prev_customer = customer_node
if "return" in self.transitions:
initial_solution.append(self.transitions["return"])
applicable = initial_solution[-1].is_applicable(state, self.model)
if not applicable:
print("return", "not feasible")
state = initial_solution[-1].apply(state, self.model)
else:
logger.warning(
f"Warmstart: 'return' transition not found. Solution may be incomplete."
)
self.initial_solution = initial_solution