# Copyright (c) 2025 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from typing import Dict, List, Optional, Tuple
import numpy as np
from discrete_optimization.generic_tasks_tools.allocation import (
AllocationProblem,
AllocationSolution,
)
from discrete_optimization.generic_tasks_tools.scheduling import (
SchedulingProblem,
SchedulingSolution,
)
from discrete_optimization.generic_tools.do_problem import (
EncodingRegister,
ModeOptim,
ObjectiveDoc,
ObjectiveHandling,
ObjectiveRegister,
Problem,
Solution,
TypeObjective,
)
Task = int
UnaryResource = int
[docs]
class VRPTWSolution(SchedulingSolution[Task], AllocationSolution[Task, UnaryResource]):
"""
Solution class for the VRPTW problem.
Attributes:
problem (VRPTWProblem): The problem instance.
routes (List[List[int]]):
List of routes. Each route is a list of customer node indices.
The depot (start/end) is implicit and not included in these lists.
arrival_times (Dict[int, List[float]]):
Maps vehicle index to a list of arrival times at customer nodes in its route.
start_service_times (Dict[int, List[float]]):
Maps vehicle index to a list of service start times at customer nodes.
route_loads (List[float]): Total demand for each route.
route_distances (List[float]): Total distance for each route.
# Evaluated metrics
total_distance (float): Sum of distances of all routes.
nb_vehicles_used (int): Number of routes used.
tw_violation (float): Total violation of time windows (sum of lateness).
capacity_violation (float): Total violation of vehicle capacities.
"""
[docs]
def is_allocated(self, task: Task, unary_resource: UnaryResource) -> bool:
return task in self.routes[unary_resource]
problem: "VRPTWProblem"
def __init__(
self,
problem: "VRPTWProblem",
routes: Optional[List[List[int]]] = None,
scaling: float = 1.0,
):
self.problem = problem
self.routes = routes if routes is not None else []
self.scaling = scaling
# Detailed schedule (filled by evaluation)
self.arrival_times: Dict[int, List[float]] = {}
self.start_service_times: Dict[int, List[float]] = {}
self.route_loads: List[float] = []
self.route_distances: List[float] = []
# Main objective and penalty values (filled by evaluation)
self.total_distance: float = 0.0
self.nb_vehicles_used: int = 0
self.tw_violation: float = 0.0
self.capacity_violation: float = 0.0
[docs]
def copy(self) -> "VRPTWSolution":
sol = VRPTWSolution(
problem=self.problem,
routes=[list(r) for r in self.routes],
)
# Copy evaluated data
sol.arrival_times = {k: list(v) for k, v in self.arrival_times.items()}
sol.start_service_times = {
k: list(v) for k, v in self.start_service_times.items()
}
sol.route_loads = list(self.route_loads)
sol.route_distances = list(self.route_distances)
sol.total_distance = self.total_distance
sol.nb_vehicles_used = self.nb_vehicles_used
sol.tw_violation = self.tw_violation
sol.capacity_violation = self.capacity_violation
return sol
[docs]
def lazy_copy(self) -> "VRPTWSolution":
# Routes are mutable, so we must copy them.
return self.copy()
[docs]
def change_problem(self, new_problem: Problem) -> None:
if not isinstance(new_problem, VRPTWProblem):
raise ValueError("new_problem must be a VRPTWProblem instance.")
self.problem = new_problem
# Invalidate evaluated metrics
self.arrival_times = {}
self.start_service_times = {}
self.route_loads = []
self.route_distances = []
self.total_distance = 0.0
self.nb_vehicles_used = 0
self.tw_violation = 0.0
self.capacity_violation = 0.0
def __str__(self) -> str:
route_str = "\n".join(
f" Vehicle {i + 1}: Depot -> {' -> '.join(map(str, route))} -> Depot"
for i, route in enumerate(self.routes)
)
return (
f"VRPTW Solution:\n"
f"Total Distance: {self.total_distance:.2f}\n"
f"Vehicles Used: {self.nb_vehicles_used}\n"
f"TW Violation: {self.tw_violation:.2f}\n"
f"Capacity Violation: {self.capacity_violation:.2f}\n"
f"Routes:\n{route_str}"
)
[docs]
def get_end_time(self, task: Task) -> int:
if getattr(self, "_times", None) is not None:
t = getattr(self, "_times")[task]
return t
if len(self.arrival_times) == 0:
self.problem.evaluate(self)
for v in range(len(self.routes)):
for j in range(len(self.routes[v])):
if self.routes[v][j] == task:
return int(self.scaling * (self.start_service_times[v][j])) + int(
self.scaling * self.problem.service_times[task]
)
if task == self.problem.depot_node:
return 0
return None
[docs]
def get_start_time(self, task: Task) -> int:
return self.get_end_time(task)
[docs]
class VRPTWProblem(SchedulingProblem[Task], AllocationProblem[Task, UnaryResource]):
"""
Vehicle Routing Problem with Time Windows (VRPTW) Problem class.
This model includes:
- Multiple vehicles with a shared capacity.
- A single depot.
- Customers with demand.
- Customers with time windows (ready time, due date).
- Customers with service times.
- Objectives: 1) Minimize number of vehicles, 2) Minimize total distance.
"""
[docs]
def get_makespan_upper_bound(self) -> int:
return round(1000 ** self.time_windows[self.depot_node][1])
def __init__(
self,
nb_vehicles: int,
vehicle_capacity: float,
nb_nodes: int,
distance_matrix: np.ndarray,
time_windows: List[Tuple[int, int]],
service_times: List[float],
demands: List[float],
depot_node: int = 0,
):
self.nb_vehicles = nb_vehicles
self.vehicle_capacity = vehicle_capacity
self.nb_nodes = nb_nodes
self.distance_matrix = distance_matrix
self.time_windows = time_windows
self.service_times = service_times
self.demands = demands
self.depot_node = depot_node
self.customers = sorted(
[i for i in range(self.nb_nodes) if i != self.depot_node]
)
self.nb_customers = len(self.customers)
# For Schedulind and allocation mixin.
self.tasks_list = self.customers
self.unary_resources_list = list(range(self.nb_vehicles))
[docs]
def get_attribute_register(self) -> EncodingRegister:
# VRPTW is complex to encode with simple registers.
# We'll rely on the Solution object itself for evaluation.
return EncodingRegister(
{
"routes": {
"name": "routes",
"type": [],
}
}
)
[docs]
def get_objective_register(self) -> ObjectiveRegister:
return ObjectiveRegister(
objective_sense=ModeOptim.MINIMIZATION,
objective_handling=ObjectiveHandling.AGGREGATE,
dict_objective_to_doc={
# Primary objective
"nb_vehicles_used": ObjectiveDoc(
type=TypeObjective.OBJECTIVE, default_weight=100000.0
),
# Secondary objective
"total_distance": ObjectiveDoc(
type=TypeObjective.OBJECTIVE, default_weight=1.0
),
# Penalties
"tw_violation": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-1000.0
),
"capacity_violation": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-1000.0
),
},
)
[docs]
def evaluate(self, solution: VRPTWSolution) -> Dict[str, float]:
"""
Evaluates a VRPTWSolution.
Calculates distances, time window violations, and capacity violations.
"""
solution.total_distance = 0.0
solution.nb_vehicles_used = len(solution.routes)
solution.tw_violation = 0.0
solution.capacity_violation = 0.0
solution.arrival_times = {}
solution.start_service_times = {}
solution.route_loads = []
solution.route_distances = []
depot_ready = self.time_windows[self.depot_node][0]
depot_due = self.time_windows[self.depot_node][1]
for v_idx, route in enumerate(solution.routes):
if not route:
continue
current_time = depot_ready
current_load = 0.0
current_dist = 0.0
last_node = self.depot_node
route_arrivals = []
route_starts = []
# Travel to first customer
first_customer = route[0]
dist = self.distance_matrix[last_node, first_customer]
current_dist += dist
arrival_time = current_time + dist
# Service at first customer
ready, due = self.time_windows[first_customer]
service = self.service_times[first_customer]
start_service_time = max(arrival_time, ready)
current_time = start_service_time + service
current_load += self.demands[first_customer]
solution.tw_violation += max(0, start_service_time - due)
route_arrivals.append(arrival_time)
route_starts.append(start_service_time)
last_node = first_customer
# Travel to subsequent customers
for customer in route[1:]:
dist = self.distance_matrix[last_node, customer]
current_dist += dist
arrival_time = current_time + dist
ready, due = self.time_windows[customer]
service = self.service_times[customer]
start_service_time = max(arrival_time, ready)
current_time = start_service_time + service
current_load += self.demands[customer]
solution.tw_violation += max(0, start_service_time - due)
route_arrivals.append(arrival_time)
route_starts.append(start_service_time)
last_node = customer
# Travel back to depot
dist_to_depot = self.distance_matrix[last_node, self.depot_node]
current_dist += dist_to_depot
arrival_back_at_depot = current_time + dist_to_depot
solution.tw_violation += max(0, arrival_back_at_depot - depot_due)
# Store route-level stats
solution.capacity_violation += max(0, current_load - self.vehicle_capacity)
solution.total_distance += current_dist
solution.arrival_times[v_idx] = route_arrivals
solution.start_service_times[v_idx] = route_starts
solution.route_loads.append(current_load)
solution.route_distances.append(current_dist)
return {
"nb_vehicles_used": solution.nb_vehicles_used,
"total_distance": solution.total_distance,
"tw_violation": -solution.tw_violation,
"capacity_violation": -solution.capacity_violation,
}
[docs]
def satisfy(self, solution: VRPTWSolution) -> bool:
# Evaluate if not already done
if solution.total_distance == 0.0 and solution.nb_vehicles_used == 0:
self.evaluate(solution)
return (
solution.tw_violation == 0
and solution.capacity_violation == 0
and solution.nb_vehicles_used <= self.nb_vehicles
)
[docs]
def get_dummy_solution(self) -> VRPTWSolution:
"""Returns a dummy solution (one vehicle per customer)."""
routes = [[c] for c in self.customers]
return VRPTWSolution(problem=self, routes=routes)
[docs]
def get_solution_type(self) -> type:
return VRPTWSolution