Source code for discrete_optimization.gpdp.transformations.from_vrp
# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""Transformation from VRP to GPDP (General Pickup and Delivery Problem)."""
from typing import Optional
from discrete_optimization.generic_tools.transformation.problem_transformation import (
ProblemTransformation,
)
from discrete_optimization.generic_tools.transformation.transformation_metadata import (
TransformationMetadata,
subset_transformation,
)
from discrete_optimization.gpdp.problem import GpdpProblem, GpdpSolution, Node
from discrete_optimization.vrp.problem import Customer2DVrpProblem, VrpSolution
[docs]
class VrpToGpdpTransformation(
ProblemTransformation[Customer2DVrpProblem, VrpSolution, GpdpProblem, GpdpSolution]
):
"""Transform VRP to GPDP.
Mapping:
- VRP vehicles → GPDP vehicles
- VRP customers → GPDP transportation nodes
- VRP start/end depots → GPDP origin/target nodes (virtual)
- VRP capacity → GPDP resource flow (demand)
- VRP distances → GPDP distance matrix
VRP is a special case of GPDP where:
- No pickup-delivery pairs
- Single resource (demand/capacity)
- No time windows (use VRPTW → GPDP for time windows)
"""
def __init__(self, compute_graph: bool = False):
"""Initialize transformation.
Args:
compute_graph: Whether to compute the GPDP graph (default: False)
"""
self.compute_graph = compute_graph
[docs]
def get_forward_metadata(self) -> TransformationMetadata:
"""Metadata for forward transformation (VRP → GPDP)."""
return subset_transformation(
use_cases=[
"Use GPDP solvers for VRP problems",
"VRP is a special case of GPDP (no pickup-delivery pairs)",
"Benchmark GPDP algorithms on VRP instances",
],
assumptions=[
"No pickup-delivery constraints",
"Single resource dimension (demand/capacity)",
"No time windows",
],
)
[docs]
def transform_problem(self, source_problem: Customer2DVrpProblem) -> GpdpProblem:
"""Transform VRP to GPDP.
Args:
source_problem: VRP problem instance
Returns:
Equivalent GPDP problem
"""
nb_vehicle = source_problem.vehicle_count
nb_customers = len(source_problem.customers)
# Identify start and end depot indices
all_start_index = set(source_problem.start_indexes)
all_end_index = set(source_problem.end_indexes)
# Real customers (exclude depot nodes)
clients_node = [
(i, source_problem.customers[i])
for i in range(nb_customers)
if i not in all_start_index and i not in all_end_index
]
# Create virtual nodes for vehicle origins and targets
# Virtual origin nodes: one per vehicle
virtual_origin_nodes = [len(clients_node) + k for k in range(nb_vehicle)]
# Virtual target nodes: one per vehicle
virtual_target_nodes = [
len(clients_node) + nb_vehicle + k for k in range(nb_vehicle)
]
# Mapping: virtual node → original depot index
virtual_to_depot = {
virtual_origin_nodes[i]: source_problem.start_indexes[i]
for i in range(nb_vehicle)
}
virtual_to_depot.update(
{
virtual_target_nodes[i]: source_problem.end_indexes[i]
for i in range(nb_vehicle)
}
)
# Mapping: GPDP customer node → VRP customer index
gpdp_to_vrp = {j: clients_node[j][0] for j in range(len(clients_node))}
# Vehicle origin and target assignments
origin_vehicle: dict[int, Node] = {
i: virtual_origin_nodes[i] for i in range(nb_vehicle)
}
target_vehicle: dict[int, Node] = {
i: virtual_target_nodes[i] for i in range(nb_vehicle)
}
# All GPDP nodes
all_gpdp_nodes = (
list(range(len(clients_node))) + virtual_origin_nodes + virtual_target_nodes
)
# Build distance and time matrices
distance_delta: dict[Node, dict[Node, float]] = {}
time_delta: dict[Node, dict[Node, float]] = {}
resources_flow_node: dict[Node, dict[str, float]] = {}
coordinates: dict[Node, tuple[float, float]] = {}
for node1 in all_gpdp_nodes:
distance_delta[node1] = {}
time_delta[node1] = {}
for node2 in all_gpdp_nodes:
# Map GPDP nodes to VRP indices
vrp_idx1 = (
gpdp_to_vrp[node1]
if node1 in gpdp_to_vrp
else virtual_to_depot[node1]
)
vrp_idx2 = (
gpdp_to_vrp[node2]
if node2 in gpdp_to_vrp
else virtual_to_depot[node2]
)
dist = source_problem.evaluate_function_indexes(vrp_idx1, vrp_idx2)
distance_delta[node1][node2] = dist
time_delta[node1][node2] = dist # Assume time = distance
# Coordinates
for node in all_gpdp_nodes:
vrp_idx = (
gpdp_to_vrp[node] if node in gpdp_to_vrp else virtual_to_depot[node]
)
customer = source_problem.customers[vrp_idx]
coordinates[node] = (customer.x, customer.y)
# Resource flow: negative demand for customers, positive capacity for origins
resources_flow_node[node] = {"demand": -customer.demand}
# Vehicle origins have positive capacity
for v in range(nb_vehicle):
resources_flow_node[origin_vehicle[v]]["demand"] = (
source_problem.vehicle_capacities[v]
)
# Resource and capacity definitions
resources_set = {"demand"}
capacities = {
i: {"demand": (0.0, source_problem.vehicle_capacities[i])}
for i in range(nb_vehicle)
}
# Edge resource flow (all zero for VRP)
resources_flow_edges = {
(x, y): {"demand": 0.0} for x in distance_delta for y in distance_delta[x]
}
# No pickup-delivery pairs in VRP
list_pickup_deliverable = []
# Nodes sets
nodes_transportation = set(gpdp_to_vrp.keys())
nodes_origin = set(virtual_to_depot.keys()) & set(virtual_origin_nodes)
nodes_target = set(virtual_to_depot.keys()) & set(virtual_target_nodes)
return GpdpProblem(
number_vehicle=nb_vehicle,
nodes_transportation=nodes_transportation,
nodes_origin=nodes_origin,
nodes_target=nodes_target,
list_pickup_deliverable=list_pickup_deliverable,
origin_vehicle=origin_vehicle,
target_vehicle=target_vehicle,
resources_set=resources_set,
capacities=capacities,
resources_flow_node=resources_flow_node,
resources_flow_edges=resources_flow_edges,
distance_delta=distance_delta,
time_delta=time_delta,
coordinates_2d=coordinates,
compute_graph=self.compute_graph,
)
[docs]
def back_transform_solution(
self, solution: GpdpSolution, source_problem: Customer2DVrpProblem
) -> VrpSolution:
"""Transform GPDP solution back to VRP solution.
Args:
solution: GPDP solution
source_problem: Original VRP problem
Returns:
Equivalent VRP solution
"""
# Extract routes from GPDP trajectories
# GPDP trajectories: dict[vehicle_id, list[Node]]
# VRP list_paths: list[list[int]] (customer indices)
# Build reverse mapping: GPDP node → VRP customer index
# This needs to be reconstructed from the transformation
# For now, simplified approach: assume GPDP customer nodes map directly
nb_customers = len(source_problem.customers)
all_start_index = set(source_problem.start_indexes)
all_end_index = set(source_problem.end_indexes)
clients_node = [
(i, source_problem.customers[i])
for i in range(nb_customers)
if i not in all_start_index and i not in all_end_index
]
gpdp_to_vrp = {j: clients_node[j][0] for j in range(len(clients_node))}
# Extract paths for each vehicle
list_paths = []
for v in range(source_problem.vehicle_count):
if v in solution.trajectories:
trajectory = solution.trajectories[v]
# Filter out origin/target nodes, keep only customer nodes
vrp_path = [
gpdp_to_vrp[node] for node in trajectory if node in gpdp_to_vrp
]
list_paths.append(vrp_path)
else:
list_paths.append([])
return VrpSolution(
problem=source_problem,
list_start_index=list(source_problem.start_indexes),
list_end_index=list(source_problem.end_indexes),
list_paths=list_paths,
)
[docs]
def forward_transform_solution(
self, solution: VrpSolution, target_problem: GpdpProblem
) -> Optional[GpdpSolution]:
"""Transform VRP solution to GPDP solution (for warmstart).
Args:
solution: VRP solution
target_problem: Target GPDP problem
Returns:
Equivalent GPDP solution
"""
# Build GPDP trajectories from VRP paths
# Need to map VRP customer indices to GPDP nodes
nb_customers = len(solution.problem.customers)
all_start_index = set(solution.problem.start_indexes)
all_end_index = set(solution.problem.end_indexes)
clients_node = [
(i, solution.problem.customers[i])
for i in range(nb_customers)
if i not in all_start_index and i not in all_end_index
]
vrp_to_gpdp = {clients_node[j][0]: j for j in range(len(clients_node))}
trajectories = {}
for v in range(solution.problem.vehicle_count):
# Convert VRP path to GPDP trajectory
gpdp_path = [target_problem.origin_vehicle[v]] # Start with origin node
gpdp_path.extend(
[vrp_to_gpdp[vrp_idx] for vrp_idx in solution.list_paths[v]]
)
gpdp_path.append(target_problem.target_vehicle[v]) # End with target node
trajectories[v] = gpdp_path
# Compute times (simplified: cumulative distance)
times = {}
for v, traj in trajectories.items():
current_time = 0.0
for i, node in enumerate(traj):
times[node] = current_time
if i < len(traj) - 1:
next_node = traj[i + 1]
current_time += target_problem.distance_delta[node][next_node]
# Resource evolution (simplified: empty for now)
resource_evolution = {}
return GpdpSolution(
problem=target_problem,
trajectories=trajectories,
times=times,
resource_evolution=resource_evolution,
)