Source code for discrete_optimization.vrp.solvers.lp_iterative

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

from __future__ import annotations

import logging
import random
from collections.abc import Callable, Iterable, Sequence
from copy import deepcopy
from typing import Any, Optional, Union

import matplotlib.pyplot as plt
import networkx as nx
import numpy as np

from discrete_optimization.generic_tools.do_solver import ResultStorage
from discrete_optimization.vrp.problem import (
    Customer2D,
    Customer2DVrpProblem,
    VrpProblem,
    VrpSolution,
    compute_length,
    length,
)
from discrete_optimization.vrp.solvers import VrpSolver

try:
    import gurobipy
except ImportError:
    gurobi_available = False
else:
    gurobi_available = True
    from gurobipy import GRB, Model, Var, quicksum


logger = logging.getLogger(__name__)

Node = tuple[int, int]
Edge = tuple[Node, Node]


[docs] def build_matrice_distance_np( customer_count: int, customers: Sequence[Customer2D] ) -> tuple[np.ndarray, np.ndarray]: matrix_x = np.ones((customer_count, customer_count), dtype=np.int_) matrix_y = np.ones((customer_count, customer_count), dtype=np.int_) for i in range(customer_count): matrix_x[i, :] *= int(customers[i].x) matrix_y[i, :] *= int(customers[i].y) matrix_x = matrix_x - np.transpose(matrix_x) matrix_y = matrix_y - np.transpose(matrix_y) distances = np.abs(matrix_x) + np.abs(matrix_y) sorted_distance = np.argsort(distances, axis=1) return sorted_distance, distances
[docs] def build_graph_pruned_vrp( vrp_problem: Customer2DVrpProblem, ) -> tuple[ nx.DiGraph, nx.DiGraph, dict[int, set[Edge]], dict[int, set[Edge]], dict[Node, set[Edge]], dict[Node, set[Edge]], ]: customer_count = vrp_problem.customer_count customers = vrp_problem.customers vehicle_count = vrp_problem.vehicle_count sd, d = build_matrice_distance_np(customer_count, customers) g = nx.DiGraph() g.add_nodes_from( [(v, i) for i in range(customer_count) for v in range(vehicle_count)] ) shape = sd.shape[0] edges_in_customers: dict[int, set[Edge]] = {i: set() for i in range(customer_count)} edges_out_customers: dict[int, set[Edge]] = { i: set() for i in range(customer_count) } edges_in_merged_graph: dict[Node, set[Edge]] = {node: set() for node in g.nodes()} edges_out_merged_graph: dict[Node, set[Edge]] = {node: set() for node in g.nodes()} for i in range(shape): nodes_to_add: Iterable[int] = sd[i, 1:10] for n in nodes_to_add: for v in range(vehicle_count): if n == i: continue node_1 = (v, i) node_2 = (v, n) g.add_edge( node_1, node_2, weight=length(customers[i], customers[n]), demand=customers[n].demand, ) g.add_edge( node_2, node_1, weight=length(customers[i], customers[n]), demand=customers[i].demand, ) edges_in_merged_graph[node_2].add((node_1, node_2)) edges_out_merged_graph[node_1].add((node_1, node_2)) edges_in_merged_graph[node_1].add((node_2, node_1)) edges_out_merged_graph[node_2].add((node_2, node_1)) edges_in_customers[n].add((node_1, node_2)) edges_out_customers[i].add((node_1, node_2)) edges_in_customers[i].add((node_2, node_1)) edges_out_customers[n].add((node_2, node_1)) nodes_to_add = range(i, min(i + 5, customer_count)) for n in nodes_to_add: for v in range(vehicle_count): if n == i: continue node_1 = (v, i) node_2 = (v, n) g.add_edge( node_1, node_2, weight=length(customers[i], customers[n]), demand=customers[n].demand, ) g.add_edge( node_2, node_1, weight=length(customers[i], customers[n]), demand=customers[i].demand, ) edges_in_merged_graph[node_2].add((node_1, node_2)) edges_out_merged_graph[node_1].add((node_1, node_2)) edges_in_merged_graph[node_1].add((node_2, node_1)) edges_out_merged_graph[node_2].add((node_2, node_1)) edges_in_customers[n].add((node_1, node_2)) edges_out_customers[i].add((node_1, node_2)) edges_in_customers[i].add((node_2, node_1)) edges_out_customers[n].add((node_2, node_1)) for v in range(vehicle_count): nodes_to_add = [vrp_problem.start_indexes[v], vrp_problem.end_indexes[v]] for n in nodes_to_add: if n == i: continue node_1 = (v, i) node_2 = (v, n) g.add_edge( node_1, node_2, weight=length(customers[i], customers[n]), demand=customers[n].demand, ) g.add_edge( node_2, node_1, weight=length(customers[i], customers[n]), demand=customers[i].demand, ) edges_in_merged_graph[node_2].add((node_1, node_2)) edges_out_merged_graph[node_1].add((node_1, node_2)) edges_in_merged_graph[node_1].add((node_2, node_1)) edges_out_merged_graph[node_2].add((node_2, node_1)) edges_in_customers[n].add((node_1, node_2)) edges_out_customers[i].add((node_1, node_2)) edges_in_customers[i].add((node_2, node_1)) edges_out_customers[n].add((node_2, node_1)) g_empty = nx.DiGraph() g_empty.add_nodes_from( [(v, i) for i in range(customer_count) for v in range(vehicle_count)] ) return ( g, g_empty, edges_in_customers, edges_out_customers, edges_in_merged_graph, edges_out_merged_graph, )
[docs] def compute_start_end_flows_info( start_indexes: list[int], end_indexes: list[int] ) -> tuple[dict[int, dict[str, Any]], dict[int, dict[str, Any]]]: start_to_i: dict[int, dict[str, Any]] = {} end_to_i: dict[int, dict[str, Any]] = {} for i in range(len(start_indexes)): s = start_indexes[i] e = end_indexes[i] if s not in start_to_i: start_to_i[s] = {"nb": 0, "vehicle": set()} if e not in end_to_i: end_to_i[e] = {"nb": 0, "vehicle": set()} start_to_i[s]["nb"] += 1 start_to_i[s]["vehicle"].add(i) end_to_i[e]["nb"] += 1 end_to_i[e]["vehicle"].add(i) return start_to_i, end_to_i
[docs] def init_model_lp( g: nx.DiGraph, edges: set[Edge], edges_in_customers: dict[int, set[Edge]], edges_out_customers: dict[int, set[Edge]], edges_in_merged_graph: dict[Node, set[Edge]], edges_out_merged_graph: dict[Node, set[Edge]], edges_warm_set: set[Edge], fraction: float, start_indexes: list[int], end_indexes: list[int], vehicle_count: int, vehicle_capacity: list[float], do_lns: bool = False, include_backward: bool = True, include_triangle: bool = False, ) -> tuple[ "Model", dict[Edge, "Var"], dict[Union[str, int], Any], dict[str, Any], dict[int, Any], ]: tsp_model = Model("VRP-master") x_var: dict[Edge, Var] = {} # decision variables on edges constraint_on_edge: dict[int, Any] = {} edges_to_constraint: set[Edge] = set() if do_lns: edges_to_constraint = set( random.sample(list(edges), int(fraction * len(edges))) ) for iedge in constraint_on_edge: tsp_model.remove(constraint_on_edge[iedge]) iedge = 0 for e in edges: x_var[e] = tsp_model.addVar( vtype=GRB.BINARY, obj=g[e[0]][e[1]]["weight"], name="x_" + str(e) ) val = 0 if e in edges_warm_set: x_var[e].start = 1 x_var[e].varhintval = 1 val = 1 else: x_var[e].start = 0 x_var[e].varhintval = 0 if e in edges_to_constraint: constraint_on_edge[iedge] = tsp_model.addLConstr( x_var[e] == val, name=str((e, iedge)) ) iedge += 1 constraint_tour_2length: dict[int, Any] = {} cnt_tour = 0 if include_backward: for edge in edges: if (edge[1], edge[0]) in edges: if (edge[1], edge[0]) == edge: continue if ( edge[0][1] == start_indexes[edge[0][0]] or edge[1][1] == start_indexes[edge[0][0]] ): continue if ( edge[0][1] == end_indexes[edge[0][0]] or edge[1][1] == end_indexes[edge[0][0]] ): continue constraint_tour_2length[cnt_tour] = tsp_model.addLConstr( x_var[edge] + x_var[(edge[1], edge[0])] <= 1, name="tour_" + str(edge), ) cnt_tour += 1 if include_triangle: constraint_triangle: dict[int, Any] = {} for node in g.nodes(): neigh = set([n for n in nx.neighbors(g, node)]) neigh_2 = { nn: neigh.intersection([n for n in nx.neighbors(g, nn)]) for nn in neigh } for node_neigh in neigh_2: if len(neigh_2[node_neigh]) >= 1: for node_neigh_neigh in neigh_2[node_neigh]: constraint_triangle[cnt_tour] = tsp_model.addLConstr( x_var[(node, node_neigh)] + x_var[(node_neigh, node_neigh_neigh)] + x_var[(node_neigh_neigh, node)] <= 2 ) tsp_model.update() constraint_flow_in: dict[Union[str, int], Any] = {} constraint_flow_out: dict[str, Any] = {} start_to_i, end_to_i = compute_start_end_flows_info(start_indexes, end_indexes) for s in start_to_i: for vehicle in start_to_i[s]["vehicle"]: constraint_flow_out["start_v_" + str(vehicle)] = tsp_model.addLConstr( quicksum( [x_var[e] for e in edges_out_customers[s] if e[0][0] == vehicle] ) == 1, name="start_v_" + str(vehicle), ) for s in end_to_i: for vehicle in end_to_i[s]["vehicle"]: constraint_flow_in["end_v_" + str(vehicle)] = tsp_model.addLConstr( quicksum( [x_var[e] for e in edges_in_customers[s] if e[0][0] == vehicle] ) == 1, name="end_v_" + str(vehicle), ) for customer in edges_in_customers: if customer in end_to_i or customer in start_to_i: # Already dealt by previous constraints continue else: constraint_flow_in[customer] = tsp_model.addLConstr( quicksum([x_var[e] for e in edges_in_customers[customer]]) == 1, name="in_" + str(customer), ) c_flow = {} for n in edges_in_merged_graph: if start_indexes[n[0]] == end_indexes[n[0]] or n[1] not in [ start_indexes[n[0]], end_indexes[n[0]], ]: c_flow[n] = tsp_model.addLConstr( quicksum( [x_var[e] for e in edges_in_merged_graph[n]] + [-x_var[e] for e in edges_out_merged_graph[n]] ) == 0, name="flow_" + str(n), ) for v in range(vehicle_count): tsp_model.addLConstr( quicksum( [g[e[0]][e[1]]["demand"] * x_var[e] for e in edges if e[0][0] == v] ) <= vehicle_capacity[v], name="capa_" + str(v), ) tsp_model.setParam("TimeLimit", 800) tsp_model.modelSense = GRB.MINIMIZE tsp_model.setParam(GRB.Param.Threads, 8) tsp_model.setParam(GRB.Param.PoolSolutions, 10000) tsp_model.setParam(GRB.Param.Method, -1) tsp_model.setParam("MIPGapAbs", 0.01) tsp_model.setParam("MIPGap", 0.003) tsp_model.setParam("Heuristics", 0.1) return tsp_model, x_var, constraint_flow_in, constraint_flow_out, constraint_on_edge
[docs] def update_graph( g: nx.DiGraph, edges: set[Edge], edges_in_customers: dict[int, set[Edge]], edges_out_customers: dict[int, set[Edge]], edges_in_merged_graph: dict[Node, set[Edge]], edges_out_merged_graph: dict[Node, set[Edge]], edges_missing: set[Edge], customers: Sequence[Customer2D], ) -> tuple[ nx.DiGraph, set[Edge], dict[int, set[Edge]], dict[int, set[Edge]], dict[Node, set[Edge]], dict[Node, set[Edge]], ]: for edge in edges_missing: g.add_edge( edge[0], edge[1], weight=length(customers[edge[0][1]], customers[edge[1][1]]), demand=customers[edge[1][1]].demand, ) g.add_edge( edge[1], edge[0], weight=length(customers[edge[0][1]], customers[edge[1][1]]), demand=customers[edge[0][1]].demand, ) edges_in_merged_graph[edge[1]].add((edge[0], edge[1])) edges_out_merged_graph[edge[0]].add((edge[0], edge[1])) edges_in_customers[edge[1][1]].add((edge[0], edge[1])) edges_out_customers[edge[0][1]].add((edge[0], edge[1])) edges_in_merged_graph[edge[0]].add((edge[1], edge[0])) edges_out_merged_graph[edge[1]].add((edge[1], edge[0])) edges_in_customers[edge[0][1]].add((edge[1], edge[0])) edges_out_customers[edge[1][1]].add((edge[1], edge[0])) edges.add((edge[0], edge[1])) edges.add((edge[1], edge[0])) return ( g, edges, edges_in_customers, edges_out_customers, edges_in_merged_graph, edges_out_merged_graph, )
[docs] def build_warm_edges_and_update_graph( vrp_problem: VrpProblem, vrp_solution: VrpSolution, graph: nx.DiGraph, edges: set[Edge], edges_in_merged_graph: dict[Node, set[Edge]], edges_out_merged_graph: dict[Node, set[Edge]], edges_in_customers: dict[int, set[Edge]], edges_out_customers: dict[int, set[Edge]], ) -> tuple[list[list[Edge]], set[Edge]]: vehicle_paths = deepcopy(vrp_solution.list_paths) edges_warm: list[list[Edge]] = [] edges_warm_set: set[Edge] = set() for i in range(len(vehicle_paths)): vehicle_paths[i] = ( [vrp_problem.start_indexes[i]] + vehicle_paths[i] + [vrp_problem.end_indexes[i]] ) edges_warm_sublist = [ ((i, v1), (i, v2)) for v1, v2 in zip(vehicle_paths[i][:-1], vehicle_paths[i][1:]) ] edges_warm_subset = set(edges_warm_sublist) edges_warm.append(edges_warm_sublist) edges_warm_set.update(edges_warm_subset) missing_edge = [e for e in edges_warm_subset if e not in edges] for edge in missing_edge: graph.add_edge( edge[0], edge[1], weight=vrp_problem.evaluate_function_indexes(edge[0][1], edge[1][1]), demand=vrp_problem.customers[edge[1][1]].demand, ) graph.add_edge( edge[1], edge[0], weight=vrp_problem.evaluate_function_indexes(edge[1][1], edge[0][1]), demand=vrp_problem.customers[edge[0][1]].demand, ) edges_in_merged_graph[edge[1]].add((edge[0], edge[1])) edges_out_merged_graph[edge[0]].add((edge[0], edge[1])) edges_in_customers[edge[1][1]].add((edge[0], edge[1])) edges_out_customers[edge[0][1]].add((edge[0], edge[1])) edges_in_merged_graph[edge[0]].add((edge[1], edge[0])) edges_out_merged_graph[edge[1]].add((edge[1], edge[0])) edges_in_customers[edge[0][1]].add((edge[1], edge[0])) edges_out_customers[edge[1][1]].add((edge[1], edge[0])) edges.add((edge[0], edge[1])) edges.add((edge[1], edge[0])) return edges_warm, edges_warm_set
[docs] class LPIterativeVrpSolver(VrpSolver): edges: set[Edge] edges_in_customers: dict[int, set[Edge]] edges_out_customers: dict[int, set[Edge]] edges_in_merged_graph: dict[Node, set[Edge]] edges_out_merged_graph: dict[Node, set[Edge]] edges_warm_set: set[Edge] problem: Customer2DVrpProblem model: Optional[Model] = None x_var: Optional[dict[Edge, Var]] = None constraint_on_edge: Optional[dict[int, Any]] = None
[docs] def init_model(self, **kwargs: Any) -> None: ( g, g_empty, edges_in_customers, edges_out_customers, edges_in_merged_graph, edges_out_merged_graph, ) = build_graph_pruned_vrp(self.problem) initial_solution = kwargs.get("initial_solution", None) if initial_solution is None: solution = self.problem.get_dummy_solution() else: vehicle_tours_b = initial_solution solution = VrpSolution( problem=self.problem, list_start_index=self.problem.start_indexes, list_end_index=self.problem.end_indexes, list_paths=vehicle_tours_b, length=None, lengths=None, capacities=None, ) edges = set(g.edges()) edges_warm, edges_warm_set = build_warm_edges_and_update_graph( vrp_problem=self.problem, vrp_solution=solution, graph=g, edges=edges, edges_in_merged_graph=edges_in_merged_graph, edges_out_merged_graph=edges_out_merged_graph, edges_in_customers=edges_in_customers, edges_out_customers=edges_out_customers, ) do_lns = kwargs.get("do_lns", False) fraction = kwargs.get("fraction_lns", 0.9) ( tsp_model, x_var, constraint_flow_in, constraint_flow_out, constraint_on_edge, ) = init_model_lp( g=g, edges=edges, edges_in_customers=edges_in_customers, edges_out_customers=edges_out_customers, edges_in_merged_graph=edges_in_merged_graph, edges_out_merged_graph=edges_out_merged_graph, edges_warm_set=edges_warm_set, start_indexes=self.problem.start_indexes, end_indexes=self.problem.end_indexes, do_lns=do_lns, fraction=fraction, vehicle_count=self.problem.vehicle_count, vehicle_capacity=self.problem.vehicle_capacities, ) self.model = tsp_model self.x_var = x_var self.constraint_on_edge = constraint_on_edge self.graph = g self.edges = edges self.edges_in_customers = edges_in_customers self.edges_out_customers = edges_out_customers self.edges_in_merged_graph = edges_in_merged_graph self.edges_out_merged_graph = edges_out_merged_graph self.edges_warm_set = edges_warm_set
[docs] def solve(self, **kwargs: Any) -> ResultStorage: do_lns = kwargs.get("do_lns", False) plot = kwargs.get("plot", True) fraction = kwargs.get("fraction_lns", 0.9) nb_iteration_max = kwargs.get("nb_iteration_max", 20) if self.model is None or self.x_var is None or self.constraint_on_edge is None: self.init_model(**kwargs) if ( self.model is None or self.x_var is None or self.constraint_on_edge is None ): raise RuntimeError( "model, x_var and constraint_on_edge attributes " "should not be None after init_model()" ) limit_time_s = kwargs.get("time_limit", 10) self.model.setParam("TimeLimit", limit_time_s) self.model.optimize() objective = self.model.getObjective().getValue() # Query number of multiple objectives, and number of solutions finished = False solutions: list[dict[int, set[Edge]]] = [] cost: list[float] = [] nb_components: list[int] = [] iteration = 0 rebuilt_solution: list[dict[int, list[Node]]] = [] rebuilt_obj: list[float] = [] best_solution_rebuilt_index = 0 best_solution_objective_rebuilt = float("inf") vehicle_count = self.problem.vehicle_count customers = self.problem.customers edges_in_customers = self.edges_in_customers edges_out_customers = self.edges_out_customers edges_in_merged_graph = self.edges_in_merged_graph edges_out_merged_graph = self.edges_out_merged_graph edges = self.edges edges_warm_set = self.edges_warm_set g = self.graph while not finished: solutions_ll = retrieve_solutions(self.model, self.x_var, vehicle_count, g) solutions += [solutions_ll[0][-1]] cost += [objective] ( x_solution, rebuilt_dict, obj, components, components_global, component_all, component_global_all, ) = reevaluate_solutions( solutions=solutions_ll, vehicle_count=vehicle_count, g=g, vrp_problem=self.problem, ) for comp in component_all: update_model_2( model=self.model, x_var=self.x_var, components_global=comp, edges_in_customers=edges_in_customers, edges_out_customers=edges_out_customers, ) nb_components += [len(components_global)] rebuilt_solution += [rebuilt_dict] rebuilt_obj += [obj] logger.debug(f"Objective rebuilt : {rebuilt_obj[-1]}") if obj < best_solution_objective_rebuilt: best_solution_objective_rebuilt = obj best_solution_rebuilt_index = iteration iteration += 1 edges_to_add: set[Edge] = set() for v in rebuilt_dict: edges_to_add.update( { (e0, e1) for e0, e1 in zip(rebuilt_dict[v][:-1], rebuilt_dict[v][1:]) } ) edges_missing = {e for e in edges_to_add if e not in edges} if len(edges_missing) > 0 or not finished: ( g, edges, edges_in_customers, edges_out_customers, edges_in_merged_graph, edges_out_merged_graph, ) = update_graph( g=g, edges=edges, edges_in_customers=edges_in_customers, edges_out_customers=edges_out_customers, edges_in_merged_graph=edges_in_merged_graph, edges_out_merged_graph=edges_out_merged_graph, edges_missing=edges_missing, customers=customers, ) self.model.reset() self.model = None ( tsp_model, x_var, constraint_flow_in, constraint_flow_out, constraint_on_edge, ) = init_model_lp( g=g, edges=edges, edges_in_customers=edges_in_customers, edges_out_customers=edges_out_customers, edges_in_merged_graph=edges_in_merged_graph, edges_out_merged_graph=edges_out_merged_graph, edges_warm_set=edges_warm_set, start_indexes=self.problem.start_indexes, end_indexes=self.problem.end_indexes, do_lns=do_lns, fraction=fraction, vehicle_count=self.problem.vehicle_count, vehicle_capacity=self.problem.vehicle_capacities, ) self.model = tsp_model self.model.setParam("TimeLimit", limit_time_s) self.x_var = x_var self.constraint_on_edge = constraint_on_edge self.graph = g self.edges = edges self.edges_in_customers = edges_in_customers self.edges_out_customers = edges_out_customers self.edges_in_merged_graph = edges_in_merged_graph self.edges_out_merged_graph = edges_out_merged_graph self.edges_warm_set = edges_warm_set for iedge in self.constraint_on_edge: self.model.remove(self.constraint_on_edge[iedge]) edges_to_constraint = set(self.x_var.keys()) if do_lns: for iedge in self.constraint_on_edge: self.model.remove(self.constraint_on_edge[iedge]) self.model.update() self.constraint_on_edge = {} edges_to_constraint = set() vehicle = set(random.sample(range(vehicle_count), 3)) edges_to_constraint.update( set([e for e in edges if e[0][0] not in vehicle]) ) logger.debug( ( len(edges_to_constraint), " edges constraint over ", len(edges), ) ) iedge = 0 x_var = self.x_var if all((e in edges) for e in edges_to_add): for e in x_var: val = 0 if e in edges_to_add: x_var[e].start = 1 x_var[e].varhintval = 1 val = 1 else: x_var[e].start = 0 x_var[e].varhintval = 0 if e in edges_to_constraint: if do_lns: self.constraint_on_edge[iedge] = self.model.addLConstr( x_var[e] == val, name=str((e, iedge)) ) iedge += 1 else: pass self.model.update() self.model.optimize() objective = self.model.getObjective().getValue() finished = finished or iteration >= nb_iteration_max if plot: plot_solve( solutions=solutions, customers=customers, rebuilt_solution=rebuilt_solution, cost=cost, rebuilt_obj=rebuilt_obj, ) logger.debug(f"Best obj : {best_solution_objective_rebuilt}") solution = VrpSolution( problem=self.problem, list_start_index=self.problem.start_indexes, list_end_index=self.problem.end_indexes, list_paths=[ [x[1] for x in rebuilt_solution[best_solution_rebuilt_index][l][1:-1]] for l in sorted(rebuilt_solution[best_solution_rebuilt_index]) ], length=None, lengths=None, capacities=None, ) _ = self.problem.evaluate(solution) fit = self.aggreg_from_sol(solution) return self.create_result_storage( [(solution, fit)], )
[docs] def plot_solve( solutions: list[dict[int, set[Edge]]], customers: Sequence[Customer2D], rebuilt_solution: list[dict[int, list[Node]]], cost: list[float], rebuilt_obj: list[float], ) -> None: fig, ax = plt.subplots(2) for i in range(len(solutions)): ll = [] for v in solutions[i]: for e in solutions[i][v]: ll.append( ax[0].plot( [customers[e[0][1]].x, customers[e[1][1]].x], [customers[e[0][1]].y, customers[e[1][1]].y], color="b", ) ) ax[1].plot( [customers[n[1]].x for n in rebuilt_solution[i][v]], [customers[n[1]].y for n in rebuilt_solution[i][v]], color="orange", ) ax[0].set_title("iter " + str(i) + " obj=" + str(int(cost[i]))) ax[1].set_title("iter " + str(i) + " obj=" + str(int(rebuilt_obj[i]))) plt.draw() plt.pause(0.01) if len(solutions) > 1 and i < len(solutions) - 1: ax[0].cla() ax[1].cla() # ax[0].lines = [] # ax[1].lines = [] plt.show()
[docs] def build_the_cycles( x_solution: set[Edge], component: set[Node], start_index: Node, end_index: Node ) -> tuple[list[Node], dict[Node, int]]: edge_of_interest = { e for e in x_solution if e[1] in component and e[0] in component } innn = {e[1]: e for e in edge_of_interest} outt = {e[0]: e for e in edge_of_interest} if start_index in outt: some_node = start_index else: some_node = next(e[0] for e in edge_of_interest) end_node = some_node if end_index not in innn else end_index path = [some_node] cur_edge = outt[some_node] indexes = {some_node: 0} cur_index = 1 while cur_edge[1] != end_node: path += [cur_edge[1]] indexes[cur_edge[1]] = cur_index cur_index += 1 cur_edge = outt[cur_edge[1]] if end_index in innn: path += [end_node] indexes[end_node] = cur_index return path, indexes
[docs] def rebuild_tsp_routine( sorted_connected_component: list[tuple[set[Node], int]], paths_component: dict[int, list[Node]], node_to_component: dict[Node, int], indexes: dict[int, dict[Node, int]], graph: nx.DiGraph, edges: set[Edge], evaluate_function_indexes: Callable[[int, int], float], vrp_problem: VrpProblem, start_index: Node, end_index: Node, ) -> tuple[list[Node], float]: rebuilded_path: list[Node] = list(paths_component[node_to_component[start_index]]) component_end: int = node_to_component[end_index] component_reconnected: set[int] = {node_to_component[start_index]} path_set: set[Node] = set(rebuilded_path) total_length_path: int = len(rebuilded_path) while len(component_reconnected) < len(sorted_connected_component): if ( len(component_reconnected) == len(sorted_connected_component) - 1 and end_index != start_index and node_to_component[end_index] != node_to_component[start_index] ): rebuilded_path = rebuilded_path + paths_component[component_end] component_reconnected.add(component_end) else: index_path: dict[Node, list[int]] = {} for i in range(len(rebuilded_path)): if rebuilded_path[i] not in index_path: index_path[rebuilded_path[i]] = [] index_path[rebuilded_path[i]] += [i] edge_out_of_interest: set[Edge] = { e for e in edges if e[0] in path_set and e[1] not in path_set } edge_in_of_interest: set[Edge] = { e for e in edges if e[0] not in path_set and e[1] in path_set } min_out_edge = None min_index_in_path = None min_component = None min_dist = float("inf") backup_min_out_edge = None backup_min_in_edge = None backup_min_index_in_path = None backup_min_component = None backup_min_dist = float("inf") for e in edge_out_of_interest: index_in = index_path[e[0]][0] index_in_1 = min(index_path[e[0]][0] + 1, total_length_path - 1) next_node_1 = rebuilded_path[index_in_1] component_e1 = node_to_component[e[1]] if ( component_e1 == component_end and len(component_reconnected) < len(sorted_connected_component) - 1 ): continue index_component_e1 = indexes[component_e1][e[1]] index_component_e1_plus1 = index_component_e1 + 1 if index_component_e1_plus1 >= len(paths_component[component_e1]): index_component_e1_plus1 = 0 next_node_component_e1 = paths_component[component_e1][ index_component_e1_plus1 ] if (next_node_component_e1, next_node_1) in edge_in_of_interest: cost = ( graph[e[0]][e[1]]["weight"] + graph[next_node_component_e1][next_node_1]["weight"] - graph[e[0]][next_node_1]["weight"] ) if cost < min_dist: min_component = node_to_component[e[1]] min_out_edge = e min_index_in_path = index_in min_dist = cost else: cost = graph[e[0]][e[1]]["weight"] if cost < backup_min_dist: backup_min_component = node_to_component[e[1]] backup_min_out_edge = e backup_min_in_edge = (next_node_component_e1, next_node_1) backup_min_index_in_path = index_in backup_min_dist = cost if ( min_out_edge is None or min_component is None or min_index_in_path is None ): if ( backup_min_component is None or backup_min_out_edge is None or backup_min_in_edge is None or backup_min_index_in_path is None ): # for mypy to realize that we must have define backup values at this point raise RuntimeError("backup values cannot be None now.") logger.debug("Backup") e = backup_min_in_edge graph.add_edge( e[0], e[1], weight=evaluate_function_indexes(e[0][0], e[1][0]) ) graph.add_edge( e[1], e[0], weight=evaluate_function_indexes(e[1][0], e[0][0]) ) min_out_edge = backup_min_out_edge min_index_in_path = backup_min_index_in_path min_component = backup_min_component len_this_component = len(paths_component[min_component]) logger.debug(f"len this component : {len_this_component}") index_of_in_component = indexes[min_component][min_out_edge[1]] new_component: list[Node] = [ paths_component[min_component][ (index_of_in_component + i) % len_this_component ] for i in range(0, -len_this_component, -1) ] logger.debug(f"path component {paths_component[min_component]}") logger.debug(f"New component : {new_component}") rebuilded_path = ( rebuilded_path[: (min_index_in_path + 1)] + new_component + rebuilded_path[(min_index_in_path + 1) :] ) for node1, node2 in zip(new_component[:-1], new_component[1:]): if (node1, node2) not in graph.edges(): graph.add_edge( node1, node2, weight=evaluate_function_indexes(node1[0], node2[0]), ) path_set = set(rebuilded_path) total_length_path = len(rebuilded_path) component_reconnected.add(min_component) lengths, obj, capacities = compute_length( start_index=start_index[1], end_index=end_index[1], solution=[x[1] for x in rebuilded_path[1:-1]], list_customers=vrp_problem.customers, method=vrp_problem.evaluate_function_indexes, ) return rebuilded_path, obj
[docs] def retrieve_solutions( model: "Model", x_var: dict[Edge, "Var"], vehicle_count: int, g: nx.DiGraph ) -> list[tuple[nx.DiGraph, dict[int, nx.DiGraph], dict[int, set[Edge]]]]: nSolutions = model.SolCount solutions: list[tuple[nx.Digraph, dict[int, nx.DiGraph], dict[int, set[Edge]]]] = [] for s in range(nSolutions): # Set which solution we will query from now on g_empty = {v: nx.DiGraph() for v in range(vehicle_count)} g_merge = nx.DiGraph() x_solution: dict[int, set[Edge]] = {v: set() for v in range(vehicle_count)} model.params.SolutionNumber = s for e in x_var: value = x_var[e].getAttr("Xn") if value >= 0.5: vehicle = e[0][0] x_solution[vehicle].add(e) clients = e[0], e[1] if clients[0] not in g_empty[vehicle]: g_empty[vehicle].add_node(clients[0]) if clients[1] not in g_empty[vehicle]: g_empty[vehicle].add_node(clients[1]) if clients[0][1] not in g_merge: g_merge.add_node(clients[0][1]) if clients[1][1] not in g_merge: g_merge.add_node(clients[1][1]) g_empty[vehicle].add_edge( clients[0], clients[1], weight=g[e[0]][e[1]]["weight"] ) g_merge.add_edge( clients[0][1], clients[1][1], weight=g[e[0]][e[1]]["weight"] ) solutions += [ ( g_merge.copy(), g_empty, x_solution.copy(), ) ] print(f"quality of {s}-th solution", model.getAttr("PoolObjVal")) return solutions
[docs] def reevaluate_solutions( solutions: list[tuple[nx.DiGraph, dict[int, nx.DiGraph], dict[int, set[Edge]]]], vehicle_count: int, g: nx.DiGraph, vrp_problem: VrpProblem, ) -> tuple[ dict[int, set[Edge]], dict[int, list[Node]], float, dict[int, list[tuple[set[Node], int]]], list[tuple[set[Node], int]], list[dict[int, list[tuple[set[Node], int]]]], list[list[tuple[set[Node], int]]], ]: rebuilt_solution: list[dict[int, list[Node]]] = [] rebuilt_obj: list[float] = [] nb_components: list[list[int]] = [] components: list[dict[int, list[tuple[set[Node], int]]]] = [] components_global: list[list[tuple[set[Node], int]]] = [] solutions_list: list[dict[int, set[Edge]]] = [] logger.debug(f"Vehicle count : {vehicle_count}") for g_merge, g_empty, x_solution in solutions: connected_components: dict[int, list[tuple[set[Node], int]]] = { v: [(set(e), len(e)) for e in nx.weakly_connected_components(g_empty[v])] for v in g_empty } logger.debug( ( "Connected component : ", [len(connected_components[v]) for v in connected_components], ) ) sorted_connected_component: dict[int, list[tuple[set[Node], int]]] = { v: sorted(connected_components[v], key=lambda x: x[1], reverse=True) for v in connected_components } components += [sorted_connected_component] nb_components += [ [len(sorted_connected_component[v]) for v in sorted_connected_component] ] solutions_list += [x_solution.copy()] paths_component: dict[int, dict[int, list[Node]]] = { v: {} for v in range(vehicle_count) } indexes_component: dict[int, dict[int, dict[Node, int]]] = { v: {} for v in range(vehicle_count) } node_to_component: dict[int, dict[Node, int]] = { v: {} for v in range(vehicle_count) } rebuilt_dict: dict[int, list[Node]] = {} objective_dict: dict[int, float] = {} component_global: list[tuple[set[Node], int]] = [ (set(e), len(e)) for e in nx.weakly_connected_components(g_merge) ] components_global += [component_global] nb_component_global = len(component_global) logger.debug(f"Global : {nb_component_global}") for v in range(vehicle_count): graph_of_interest = nx.subgraph( g, [e[0] for e in x_solution[v]] + [e[1] for e in x_solution[v]] ).copy() nb = len(sorted_connected_component[v]) for i in range(nb): s = sorted_connected_component[v][i] paths_component[v][i], indexes_component[v][i] = build_the_cycles( x_solution=x_solution[v], component=s[0], start_index=(v, vrp_problem.start_indexes[v]), end_index=(v, vrp_problem.end_indexes[v]), ) node_to_component[v].update({p: i for p in paths_component[v][i]}) rebuilt_dict[v], objective_dict[v] = rebuild_tsp_routine( sorted_connected_component=sorted_connected_component[v], paths_component=paths_component[v], node_to_component=node_to_component[v], start_index=(v, vrp_problem.start_indexes[v]), end_index=(v, vrp_problem.end_indexes[v]), indexes=indexes_component[v], graph=graph_of_interest, edges=set(graph_of_interest.edges()), evaluate_function_indexes=vrp_problem.evaluate_function_indexes, vrp_problem=vrp_problem, ) rebuilt_solution += [rebuilt_dict] rebuilt_obj += [sum(list(objective_dict.values()))] logger.debug(("Rebuilt : ", rebuilt_solution, rebuilt_obj)) index_best = min(range(len(rebuilt_obj)), key=lambda x: rebuilt_obj[x]) logger.debug(f"{index_best} / {len(rebuilt_obj)}") logger.debug(f"best : {rebuilt_obj[index_best]}") return ( solutions_list[index_best], rebuilt_solution[index_best], rebuilt_obj[index_best], components[index_best], components_global[index_best], components, components_global, )
[docs] def update_model_2( model: "Model", x_var: dict[Edge, "Var"], components_global: dict[int, list[tuple[set[Node], int]]], edges_in_customers: dict[int, set[Edge]], edges_out_customers: dict[int, set[Edge]], ) -> None: for vehicle in components_global: if len(components_global[vehicle]) > 1: logger.debug( f"Nb component for vehicle {vehicle}: {len(components_global[vehicle])}" ) for s in components_global[vehicle]: customers_component: set[int] = {customer for vehicle, customer in s[0]} edge_in_of_interest = [ e for customer in customers_component for e in edges_in_customers[customer] if e[0][1] not in s[0] and e[1][1] in customers_component ] edge_out_of_interest = [ e for customer in customers_component for e in edges_out_customers[customer] if e[1][1] not in customers_component and e[0][1] in customers_component ] model.addLConstr(quicksum([x_var[e] for e in edge_in_of_interest]) >= 1) model.addLConstr( quicksum([x_var[e] for e in edge_out_of_interest]) >= 1 ) model.update()