# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
from collections import Counter, defaultdict
from typing import Any, Callable, Dict
from discrete_optimization.generic_tools.callbacks.callback import (
Callback,
CallbackList,
)
from discrete_optimization.generic_tools.cp_tools import ParametersCp
from discrete_optimization.generic_tools.do_problem import Solution
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
CategoricalHyperparameter,
FloatHyperparameter,
)
from discrete_optimization.generic_tools.lp_tools import (
GurobiMilpSolver,
MilpSolver,
OrtoolsMathOptMilpSolver,
ParametersMilp,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
)
from discrete_optimization.multibatching.problem import (
MultibatchingProblem,
MultibatchingSolution,
PackingTransport,
)
from discrete_optimization.multibatching.solvers import MultibatchingSolver
from discrete_optimization.multibatching.solvers.solver_utils import (
precompute_valid_links,
)
try:
import gurobipy
except ImportError:
gurobi_available = False
else:
gurobi_available = True
logger = logging.getLogger(__name__)
class _BaseLpMultibatchingSolver(MilpSolver, MultibatchingSolver):
"""Base class for Multibatching LP solvers.
This solver supports two main modeling variants:
1. **Standard Flow Model**: Allows multiple different products to be packed
into the same transport (multi-batching).
2. **Single Batching Flow Model**: A more constrained version where each transport
can only carry a single type of product.
Attributes:
problem (MultibatchingProblem): The multibatching problem instance to solve.
variables (Dict): A dictionary to store variables created for the model.
single_batching (bool): Flag indicating which model variant to use.
scaling_factor (int): A factor to scale floating-point costs into integers for the model.
"""
problem: MultibatchingProblem
hyperparameters = [
CategoricalHyperparameter(
name="add_lb_constraint_nb_trips", choices=[True, False], default=False
),
CategoricalHyperparameter(
name="restrict_to_shortest_paths", choices=[True, False], default=False
),
FloatHyperparameter(
name="shortest_path_tolerance",
low=0,
high=30,
default=0.2,
depends_on=[("restrict_to_shortest_paths", True)],
),
]
def __init__(self, problem: MultibatchingProblem, **kwargs: Any):
super().__init__(problem, **kwargs)
self.variables: Dict[str, Any] = {}
self.single_batching: bool = False
self.scaling_factor = kwargs.get("scaling_factor", 1)
def init_model(self, single_batching: bool = False, **kwargs: Any) -> None:
"""
Initializes the model by dispatching to the appropriate
model-building method based on the `single_batching` flag.
"""
kwargs = self.complete_with_default_hyperparameters(kwargs)
self.single_batching = single_batching
if self.single_batching:
self._init_model_flows_single_batching(**kwargs)
else:
self._init_model_flows_multi_batching(**kwargs)
def _init_model_flows_multi_batching(self, **kwargs: Any) -> None:
"""Builds the standard flow model where multiple products can share a transport."""
print(kwargs)
use_shortest_path = kwargs.get("restrict_to_shortest_paths", False)
sp_tolerance = kwargs.get(
"shortest_path_tolerance", 0.2
) # 0.0 = Strict, 0.2 = +20% length allowed
# Option B: No incoming flow at producers
no_incoming_at_source = kwargs.get("prevent_incoming_at_source", False)
valid_links_map = None
if use_shortest_path:
logger.info(
f"Computing valid links (Shortest Path Heuristic, tol={sp_tolerance})..."
)
valid_links_map = precompute_valid_links(
self.problem, tolerance=sp_tolerance
)
logger.info("Initializing model (Multi-Batching Flow)...")
self.model = self.create_empty_model("multibatching-flow")
# --- Pre-computation ---
supply_per_product = {
p: self.problem.get_total_supply(p) for p in self.problem.products
}
# --- Variables ---
nb_trips = {}
for i in range(self.problem.nb_transport_links):
nb_trips[i] = self.add_integer_variable(name=f"nb_trips_{i}")
flows = {}
for i, tl in enumerate(self.problem.transport_links):
flows[i] = {}
for p_idx, p in enumerate(self.problem.products):
if (
# 1. Transport Compatibility & Capacity
tl.transport_type in p.valid_transports
and tl.transport_type.capacity > p.size
# 2. Heuristic A: Restrict to Valid Paths (Shortest Path logic)
and (
valid_links_map is None or i in valid_links_map.get(p.id, set())
)
# 3. Heuristic B: Prevent Incoming Flow at Producers (No U-turns to factory)
and (
not no_incoming_at_source
or tl.location_l2.net_supply.get(p, 0) <= 1e-6
)
):
flows[i][p_idx] = self.add_integer_variable(
name=f"flow_{i}_{p_idx}", ub=supply_per_product[p]
)
# --- Constraints ---
# Capacity Constraint
for i, tl in enumerate(self.problem.transport_links):
total_volume = self.construct_linear_sum(
flows[i][p_idx] * p.size
for p_idx, p in enumerate(self.problem.products)
if p_idx in flows[i]
)
self.add_linear_constraint(
total_volume <= nb_trips[i] * tl.transport_type.capacity
)
# Flow Conservation
for loc in self.problem.locations:
for p_idx, p in enumerate(self.problem.products):
in_flow = self.construct_linear_sum(
flows[i][p_idx]
for i, tl in enumerate(self.problem.transport_links)
if tl.location_l2 == loc and p_idx in flows[i]
)
out_flow = self.construct_linear_sum(
flows[i][p_idx]
for i, tl in enumerate(self.problem.transport_links)
if tl.location_l1 == loc and p_idx in flows[i]
)
self.add_linear_constraint(
in_flow + loc.net_supply.get(p, 0) == out_flow
)
if kwargs["add_lb_constraint_nb_trips"]:
self.add_advanced_capacity_constraints(
flows_variables=flows,
nb_trips_per_link=nb_trips,
max_k=10,
)
# --- Objective ---
self._set_objective(nb_trips, flows)
self.variables = {"nb_trips": nb_trips, "flows": flows}
def _init_model_flows_single_batching(self, **kwargs: Any) -> None:
"""Builds the constrained flow model where only one product type is allowed per transport."""
logger.info("Initializing model (Single-Batching Flow)...")
self.model = self.create_empty_model("multibatching-single-flow")
# --- Pre-computation ---
supply_per_product = {
p: self.problem.get_total_supply(p) for p in self.problem.products
}
# --- Variables ---
nb_trips = {}
for i in range(self.problem.nb_transport_links):
nb_trips[i] = self.add_integer_variable(name=f"nb_trips_{i}")
flows = {}
for i, tl in enumerate(self.problem.transport_links):
flows[i] = {}
for p_idx, p in enumerate(self.problem.products):
if (
tl.transport_type in p.valid_transports
and tl.transport_type.capacity > p.size
):
flows[i][p_idx] = self.add_integer_variable(
name=f"flow_{i}_{p_idx}", ub=supply_per_product[p]
)
# Trips dedicated to a specific product on a specific link
nb_trips_per_product = {}
for i in range(self.problem.nb_transport_links):
for p_idx in flows[i]:
nb_trips_per_product[(i, p_idx)] = self.add_integer_variable(
name=f"nb_trips_{i}_{p_idx}"
)
# --- Constraints ---
for i, tl in enumerate(self.problem.transport_links):
# Total trips on a link is the sum of product-dedicated trips
self.add_linear_constraint(
self.construct_linear_sum(
nb_trips_per_product[(i, p_idx)] for p_idx in flows[i]
)
== nb_trips[i]
)
for p_idx, p in enumerate(self.problem.products):
if p_idx in flows[i]:
# Capacity constraint for single-product trips
products_per_trip = (
tl.transport_type.capacity // p.size if p.size > 0 else 0
)
if products_per_trip > 0:
self.add_linear_constraint(
flows[i][p_idx]
<= nb_trips_per_product[(i, p_idx)] * products_per_trip
)
else: # Cannot transport this product
self.add_linear_constraint(flows[i][p_idx] == 0)
# Flow conservation (same as multi-batching)
for loc in self.problem.locations:
for p_idx, p in enumerate(self.problem.products):
in_flow = self.construct_linear_sum(
flows[i][p_idx]
for i, tl in enumerate(self.problem.transport_links)
if tl.location_l2 == loc and p_idx in flows[i]
)
out_flow = self.construct_linear_sum(
flows[i][p_idx]
for i, tl in enumerate(self.problem.transport_links)
if tl.location_l1 == loc and p_idx in flows[i]
)
self.add_linear_constraint(
in_flow + loc.net_supply.get(p, 0) == out_flow
)
# --- Objective ---
self._set_objective(nb_trips, flows)
self.variables = {
"nb_trips": nb_trips,
"flows": flows,
"nb_trips_per_product": nb_trips_per_product,
}
def add_advanced_capacity_constraints(
self, flows_variables, nb_trips_per_link, max_k=30
):
"""
Adds generalized lower bound constraints on the number of trips.
For each k in [2..max_k], we consider items with size > Capacity/k.
Since at most k-1 such items fit in one vehicle, we add the cut:
(k-1) * NbTrips >= Sum(Flows of these items)
"""
for index_tl in range(self.problem.nb_transport_links):
if index_tl not in nb_trips_per_link:
continue
tl = self.problem.transport_links[index_tl]
capacity = tl.transport_type.capacity
if capacity <= 0:
continue
nb_trips_var = nb_trips_per_link[index_tl]
current_flow_vars = flows_variables[index_tl]
# Iterate k from 2 up to max_k (e.g., items > C/2, > C/3, ..., > C/10)
for k in range(2, max_k + 1):
relevant_flows = []
for index_product, flow_var in current_flow_vars.items():
product = self.problem.products[index_product]
# Check condition: size > Capacity / k <==> size * k > Capacity
if product.size * k > capacity:
relevant_flows.append(flow_var)
if relevant_flows:
# Constraint: (k-1) * NbTrips >= Sum(Relevant Flows)
lhs = (k - 1) * nb_trips_var
rhs = self.construct_linear_sum(relevant_flows)
self.add_linear_constraint(lhs >= rhs)
def _set_objective(self, nb_trips, flows):
"""Helper function to build and set the objective expression."""
transport_cost = self.construct_linear_sum(
nb_trips[i]
* int(tl.distance * tl.transport_type.cost * self.scaling_factor)
for i, tl in enumerate(self.problem.transport_links)
)
emission_cost = self.construct_linear_sum(
nb_trips[i]
* int(tl.distance * tl.transport_type.emissions * self.scaling_factor)
for i, tl in enumerate(self.problem.transport_links)
)
self.set_model_objective(transport_cost + emission_cost, minimize=True)
def retrieve_current_solution(
self,
get_var_value_for_current_solution: Callable[[Any], float],
get_obj_value_for_current_solution: Callable[[], float],
) -> Solution:
"""
Converts a solution into a `MultibatchingSolution`.
If `single_batching` is used, it reconstructs the exact packings. Otherwise, it creates
an "average" packing for multi-batch flows.
"""
list_flows = []
nb_trips_vals = {
i: get_var_value_for_current_solution(var)
for i, var in self.variables["nb_trips"].items()
}
for i, tl in enumerate(self.problem.transport_links):
if nb_trips_vals.get(i, 0) > 1e-6:
if self.single_batching:
# Reconstruct exact packings for single-batching
for p_idx, p in enumerate(self.problem.products):
if (i, p_idx) in self.variables["nb_trips_per_product"]:
flow_val = get_var_value_for_current_solution(
self.variables["flows"][i][p_idx]
)
if flow_val > 1e-6:
products_per_trip = tl.transport_type.capacity // p.size
num_full_trips = int(flow_val // products_per_trip)
remainder = flow_val % products_per_trip
if num_full_trips > 0:
list_flows.append(
PackingTransport(
transport_link=tl,
product_packing={p: products_per_trip},
nb_packing=num_full_trips,
)
)
if remainder > 1e-6:
list_flows.append(
PackingTransport(
transport_link=tl,
product_packing={p: remainder},
nb_packing=1,
)
)
else:
# Create average packing for multi-batching
product_packing = {}
for p_idx, p in enumerate(self.problem.products):
if p_idx in self.variables["flows"][i]:
flow_val = get_var_value_for_current_solution(
self.variables["flows"][i][p_idx]
)
if flow_val > 1e-6:
product_packing[p] = flow_val / nb_trips_vals[i]
if product_packing:
list_flows.append(
PackingTransport(
transport_link=tl,
product_packing=product_packing,
nb_packing=int(round(nb_trips_vals[i])),
)
)
return MultibatchingSolution(problem=self.problem, list_flows=list_flows)
def convert_to_variable_values(
self, solution: MultibatchingSolution
) -> Dict[Any, float]:
"""Converts a `MultibatchingSolution` to a dictionary of variable values for warm start."""
variable_values = {}
# Aggregate flows and trips from the solution
agg_flows = defaultdict(lambda: defaultdict(float))
agg_trips = defaultdict(float)
agg_trips_per_product = defaultdict(float)
for flow in solution.list_flows:
link_idx = self.problem.transport_links_to_index[flow.transport_link]
agg_trips[link_idx] += flow.nb_packing
for product, units in flow.product_packing.items():
prod_idx = self.problem.product_to_index[product]
agg_flows[link_idx][prod_idx] += units * flow.nb_packing
if self.single_batching:
agg_trips_per_product[(link_idx, prod_idx)] += flow.nb_packing
# Map aggregated values to variables
for i, var in self.variables["nb_trips"].items():
variable_values[var] = agg_trips.get(i, 0)
for i, flow_map in self.variables["flows"].items():
for p_idx, var in flow_map.items():
variable_values[var] = agg_flows.get(i, {}).get(p_idx, 0)
if self.single_batching:
for (i, p_idx), var in self.variables["nb_trips_per_product"].items():
variable_values[var] = agg_trips_per_product.get((i, p_idx), 0)
return variable_values
[docs]
class GurobiMultibatchingSolver(GurobiMilpSolver, _BaseLpMultibatchingSolver):
"""Gurobi solver for the Multibatching problem, based on a flow formulation."""
hyperparameters = _BaseLpMultibatchingSolver.hyperparameters
[docs]
def convert_to_variable_values(
self, solution: Solution
) -> dict["gurobipy.Var", float]:
return _BaseLpMultibatchingSolver.convert_to_variable_values(self, solution)
[docs]
def init_model(self, single_batching: bool = False, **kwargs: Any) -> None:
_BaseLpMultibatchingSolver.init_model(self, single_batching, **kwargs)
self.model.update()
[docs]
class MathOptMultibatchingSolver(OrtoolsMathOptMilpSolver, _BaseLpMultibatchingSolver):
"""MathOpt solver for the Multibatching problem, based on a flow formulation."""
hyperparameters = (
OrtoolsMathOptMilpSolver.hyperparameters
+ _BaseLpMultibatchingSolver.hyperparameters
)
[docs]
def convert_to_variable_values(
self, solution: Solution
) -> dict["gurobipy.Var", float]:
return _BaseLpMultibatchingSolver.convert_to_variable_values(self, solution)
# Unit Flow Solver
class _BaseLpMultibatchingSolverUnitFlow(MilpSolver, MultibatchingSolver):
"""Base class for Multibatching LP solvers using a detailed "unit flow" formulation.
This model creates variables for each potential trip on each transport link and
decides the exact content of each trip. It is more precise than an aggregated
flow model but can be more computationally expensive due to the large number
of binary and integer variables.
This formulation is analogous to the `UNIT_FLOW` model in the Cpsat solver.
"""
hyperparameters = [
CategoricalHyperparameter(
"restrict_to_shortest_paths", choices=[True, False], default=False
),
FloatHyperparameter(
"shortest_path_tolerance",
low=0,
high=10,
default=10,
depends_on=[("restrict_to_shortest_paths", True)],
),
]
problem: MultibatchingProblem
def __init__(self, problem: MultibatchingProblem, **kwargs: Any):
super().__init__(problem, **kwargs)
self.variables: Dict[str, Any] = {}
self.scaling_factor = kwargs.get("scaling_factor", 1)
def init_model(self, max_trips_per_link: int = 15, **kwargs: Any) -> None:
"""
Builds the Gurobi model based on a detailed, per-trip ("unit flow") formulation.
Args:
max_trips_per_link (int): A hard upper bound on the number of trips that can be
used on any single transport link. This is necessary
to define the variable space.
"""
logger.info("Initializing model (Unit Flow)...")
logger.info(f"Max trips : {max_trips_per_link}")
use_shortest_path = kwargs.get("restrict_to_shortest_paths", False)
sp_tolerance = kwargs.get(
"shortest_path_tolerance", 0.0
) # 0.0 = Strict, 0.2 = +20% length allowed
# Option B: No incoming flow at producers
no_incoming_at_source = kwargs.get("prevent_incoming_at_source", False)
valid_links_map = None
if use_shortest_path:
logger.info(
f"Computing valid links (Shortest Path Heuristic, tol={sp_tolerance})..."
)
valid_links_map = precompute_valid_links(
self.problem, tolerance=sp_tolerance
)
self.model = self.create_empty_model("multibatching-unit-flow")
model = self.model
sol: MultibatchingSolution = kwargs.get("solution", None)
delta: int = kwargs.get("delta_to_solution", 1)
max_nb_trip_per_transport_link = {
i: max_trips_per_link for i in range(self.problem.nb_transport_links)
}
if sol is not None:
max_nb_trip_per_transport_link = defaultdict(lambda: delta)
for pt in sol.list_flows:
id_tl = self.problem.transport_links_to_index[pt.transport_link]
max_nb_trip_per_transport_link[id_tl] += pt.nb_packing
# Create binary variables for whether a trip is used
used_trips = {}
for i in range(self.problem.nb_transport_links):
used_trips[i] = {}
for j in range(max_nb_trip_per_transport_link[i]):
used_trips[i][j] = self.add_binary_variable(name=f"used_{i}_{j}")
# `trip_contents[link_idx][trip_idx][prod_idx]` = quantity of a product on a specific trip
trip_contents = {}
for i, tl in enumerate(self.problem.transport_links):
trip_contents[i] = {}
for j in range(max_nb_trip_per_transport_link[i]):
trip_contents[i][j] = {}
for p_idx, p in enumerate(self.problem.products):
if (
# 1. Transport Compatibility & Capacity
tl.transport_type in p.valid_transports
and tl.transport_type.capacity > p.size
# 2. Heuristic A: Restrict to Valid Paths (Shortest Path logic)
and (
valid_links_map is None
or i in valid_links_map.get(p.id, set())
)
# 3. Heuristic B: Prevent Incoming Flow at Producers (No U-turns to factory)
and (
not no_incoming_at_source
or tl.location_l2.net_supply.get(p, 0) <= 1e-6
)
):
ub_val = (
int(tl.transport_type.capacity / p.size)
if p.size > 0
else 0
)
trip_contents[i][j][p_idx] = self.add_integer_variable(
name=f"cont_{i}_{j}_{p_idx}", ub=ub_val
)
# --- Constraints ---
for i, tl in enumerate(self.problem.transport_links):
for j in range(max_nb_trip_per_transport_link[i]):
# Capacity constraint for each individual trip
trip_volume = self.construct_linear_sum(
trip_contents[i][j][p_idx] * p.size
for p_idx, p in enumerate(self.problem.products)
if p_idx in trip_contents[i][j]
)
self.add_linear_constraint(trip_volume <= tl.transport_type.capacity)
# Link `used_trips` to trip contents using big-M formulation
# IF used_trips[i][j] is 1, THEN total_items >= 1
# IF used_trips[i][j] is 0, THEN total_items == 0
total_items_in_trip = self.construct_linear_sum(
trip_contents[i][j][p_idx] for p_idx in trip_contents[i][j]
)
# Big-M upper bound: compute maximum possible items in this trip
M = sum(
int(tl.transport_type.capacity / p.size) if p.size > 0 else 0
for p_idx, p in enumerate(self.problem.products)
if p_idx in trip_contents[i][j]
)
if M == 0:
M = 1 # Safety for empty trips
# Constraint 1: total_items >= used_trips (enforces items >= 1 when used=1)
self.add_linear_constraint(total_items_in_trip >= used_trips[i][j])
# Constraint 2: total_items <= M * used_trips (enforces items = 0 when used=0)
self.add_linear_constraint(total_items_in_trip <= M * used_trips[i][j])
# Aggregate expressions for total flow, linking trip contents to flow conservation
total_flows = {}
for i in range(self.problem.nb_transport_links):
total_flows[i] = {}
for p_idx, p in enumerate(self.problem.products):
# Check if this product can be on any trip on this link
if any(
p_idx in trip_contents[i][j]
for j in range(max_nb_trip_per_transport_link[i])
):
total_flows[i][p_idx] = self.construct_linear_sum(
trip_contents[i][j].get(p_idx, 0)
for j in range(max_nb_trip_per_transport_link[i])
if p_idx in trip_contents[i][j]
)
# Flow conservation constraints (using the aggregated total_flows)
for loc in self.problem.locations:
for p_idx, p in enumerate(self.problem.products):
in_flow = self.construct_linear_sum(
total_flows[i].get(p_idx, 0)
for i, tl in enumerate(self.problem.transport_links)
if tl.location_l2 == loc and p_idx in total_flows[i]
)
out_flow = self.construct_linear_sum(
total_flows[i].get(p_idx, 0)
for i, tl in enumerate(self.problem.transport_links)
if tl.location_l1 == loc and p_idx in total_flows[i]
)
self.add_linear_constraint(
in_flow + loc.net_supply.get(p, 0) == out_flow
)
# --- Objective Function ---
nb_trips_per_link = {}
for i in range(self.problem.nb_transport_links):
nb_trips_per_link[i] = self.construct_linear_sum(
used_trips[i][j] for j in range(max_nb_trip_per_transport_link[i])
)
transport_cost = self.construct_linear_sum(
nb_trips_per_link[i]
* int(tl.distance * tl.transport_type.cost * self.scaling_factor)
for i, tl in enumerate(self.problem.transport_links)
)
emission_cost = self.construct_linear_sum(
nb_trips_per_link[i]
* int(tl.distance * tl.transport_type.emissions * self.scaling_factor)
for i, tl in enumerate(self.problem.transport_links)
)
self.set_model_objective(transport_cost + emission_cost, minimize=True)
self.variables = {
"used_trips": used_trips,
"trip_contents": trip_contents,
}
def retrieve_current_solution(
self,
get_var_value_for_current_solution: Callable[[Any], float],
get_obj_value_for_current_solution: Callable[[], float],
) -> Solution:
"""
Retrieves the solution by reconstructing packings from the detailed trip variables.
It groups identical packings to create a concise solution.
"""
list_flows = []
for i, link in enumerate(self.problem.transport_links):
packings_on_link = []
for j in range(len(self.variables["used_trips"][i])):
if (
get_var_value_for_current_solution(
self.variables["used_trips"][i][j]
)
> 0.5
):
current_packing = {
self.problem.products[p_idx]: round(
get_var_value_for_current_solution(content_var)
)
for p_idx, content_var in self.variables["trip_contents"][i][
j
].items()
if get_var_value_for_current_solution(content_var) > 0.5
}
if current_packing:
# Use a frozenset of items for a hashable representation of the packing
packings_on_link.append(frozenset(current_packing.items()))
if packings_on_link:
# Count occurrences of each unique packing configuration
for unique_packing_items, count in Counter(packings_on_link).items():
list_flows.append(
PackingTransport(
transport_link=link,
product_packing=dict(unique_packing_items),
nb_packing=count,
)
)
return MultibatchingSolution(problem=self.problem, list_flows=list_flows)
def convert_to_variable_values(
self, solution: MultibatchingSolution
) -> Dict[Any, float]:
used_trips = self.variables["used_trips"]
trip_contents = self.variables["trip_contents"]
trips_per_link = defaultdict(list)
dict_vars = {}
for flow in solution.list_flows:
index_tl = self.problem.transport_links_to_index[flow.transport_link]
for _ in range(flow.nb_packing):
trips_per_link[index_tl].append(flow.product_packing)
for index_tl in trips_per_link:
nb_trips = len(trips_per_link[index_tl])
for i in range(nb_trips):
dict_vars[used_trips[index_tl][i]] = 1
for i in used_trips[index_tl]:
if i >= nb_trips:
dict_vars[used_trips[index_tl][i]] = 0
for i in range(nb_trips):
pack = trips_per_link[index_tl][i]
for p_index in trip_contents[index_tl][i]:
p = self.problem.products[p_index]
if p in pack:
dict_vars[trip_contents[index_tl][i][p_index]] = pack[p]
else:
dict_vars[trip_contents[index_tl][i][p_index]] = 0
for i in trip_contents[index_tl]:
if i >= nb_trips:
for p_index in trip_contents[index_tl][i]:
dict_vars[trip_contents[index_tl][i][p_index]] = 0
for index_tl in used_trips:
if index_tl not in trips_per_link:
for i in used_trips[index_tl]:
dict_vars[used_trips[index_tl][i]] = 0
for i in trip_contents[index_tl]:
for p_index in trip_contents[index_tl][i]:
dict_vars[trip_contents[index_tl][i][p_index]] = 0
return dict_vars
[docs]
class GurobiMultibatchingSolverUnitFlow(
GurobiMilpSolver, _BaseLpMultibatchingSolverUnitFlow
):
"""Gurobi solver for the Multibatching problem using unit flow formulation."""
[docs]
def convert_to_variable_values(self, solution: MultibatchingSolution):
return _BaseLpMultibatchingSolverUnitFlow.convert_to_variable_values(
self, solution
)
[docs]
def init_model(self, max_trips_per_link: int = 15, **kwargs: Any) -> None:
_BaseLpMultibatchingSolverUnitFlow.init_model(
self, max_trips_per_link, **kwargs
)
self.model.update()
[docs]
class MathOptMultibatchingSolverUnitFlow(
OrtoolsMathOptMilpSolver, _BaseLpMultibatchingSolverUnitFlow
):
"""MathOpt solver for the Multibatching problem using unit flow formulation."""
[docs]
def convert_to_variable_values(self, solution: MultibatchingSolution):
return _BaseLpMultibatchingSolverUnitFlow.convert_to_variable_values(
self, solution
)
# Lazy constraint variants (Gurobi-specific)
[docs]
class GurobiMultibatchingSolverWithLazyConstraint(GurobiMultibatchingSolver):
"""
A Gurobi solver for the multi-batching problem that uses lazy constraints
to iteratively refine the packing model.
"""
[docs]
def init_model(self, **kwargs: Any) -> None:
"""
Initializes the standard multi-batching flow model and enables lazy constraints.
"""
# We only apply this logic to the multi-batching case
super().init_model(single_batching=False, **kwargs)
# Tell Gurobi to enable lazy constraints
self.model.setParam("LazyConstraints", 1)
logger.info("Gurobi model with Lazy Constraints enabled.")
[docs]
def solve(
self,
callbacks: list[Callback] = None,
parameters_milp: ParametersMilp = None,
time_limit: float = 30,
**kwargs: Any,
) -> ResultStorage:
"""
Overrides the default solve method to use a custom callback for generating
lazy constraints.
"""
self.early_stopping_exception = None
self._current_internal_objective_best_value = None
self._current_internal_objective_best_bound = None
callbacks_list = CallbackList(callbacks=callbacks)
callbacks_list.on_solve_start(solver=self)
# Use our specialized callback for lazy constraints
gurobi_lazy_callback = GurobiLazyCallback(
do_solver=self, callback=callbacks_list
)
self.optimize_model(
parameters_milp=parameters_milp,
time_limit=time_limit,
gurobi_callback=gurobi_lazy_callback,
**kwargs,
)
if hasattr(self.model, "ObjVal"):
self._current_internal_objective_best_value = self.model.ObjVal
if hasattr(self.model, "ObjBound"):
self._current_internal_objective_best_bound = self.model.ObjBound
res = gurobi_lazy_callback.res
callbacks_list.on_solve_end(res=res, solver=self)
return res
# Import lazy callback helpers
from discrete_optimization.binpack.solvers.cpsat import (
CpSatBinPackSolver,
ModelingBinPack,
)
from discrete_optimization.multibatching.solvers.packing_subproblem import (
PackingViaBinPacking,
SubBrick,
)
[docs]
class GurobiLazyCallback:
"""
Custom Gurobi callback to generate valid, flow-dependent cuts for the multibatching problem.
"""
def __init__(self, do_solver: GurobiMultibatchingSolver, callback: Callback):
self.do_solver = do_solver
self.callback = callback
self.res = do_solver.create_result_storage()
self.nb_solutions = 0
self.nb_cuts_added = 0
self.packing_solver = PackingViaBinPacking(problem=self.do_solver.problem)
def __call__(self, model: "gurobipy.Model", where: int) -> None:
if where == gurobipy.GRB.Callback.MIPSOL:
try:
# 1. Retrieve the candidate solution from the MILP
candidate_solution = self.do_solver.retrieve_current_solution(
get_var_value_for_current_solution=model.cbGetSolution,
get_obj_value_for_current_solution=lambda: model.cbGet(
gurobipy.GRB.Callback.MIPSOL_OBJ
),
)
# 2. Run the greedy packing subproblem to get the "true" number of trips
self.packing_solver.init_from_solution(solution=candidate_solution)
p = ParametersCp.default_cpsat()
p.nb_process = 16
packing_result = self.packing_solver.solve(
time_limit_per_link=3,
bin_packing_solver=SubBrick(
cls=CpSatBinPackSolver,
kwargs=dict(
modeling=ModelingBinPack.SCHEDULING, parameters_cp=p
),
),
)
logger.debug(
f"fit packing {self.do_solver.aggreg_from_sol(packing_result[-1][0])}"
)
logger.debug(
f"sat packing {self.do_solver.problem.satisfy(packing_result[-1][0])}"
)
if len(packing_result) == 0:
return
packed_solution = packing_result.get_best_solution_fit()[0]
# 3. Check for violations and add valid, flow-dependent lazy constraints
milp_trips = defaultdict(int)
for flow in candidate_solution.list_flows:
milp_trips[flow.transport_link] += flow.nb_packing
greedy_trips = defaultdict(int)
for flow in packed_solution.list_flows:
greedy_trips[flow.transport_link] += flow.nb_packing
violation_found = False
for link, milp_trip_count_float in milp_trips.items():
milp_trip_count = round(milp_trip_count_float)
greedy_trip_count = greedy_trips.get(link, 0)
if greedy_trip_count > milp_trip_count:
flows_product_in_this_link = {}
for flow in candidate_solution.list_flows:
if flow.transport_link == link:
for p in flow.product_packing:
if p not in flows_product_in_this_link:
flows_product_in_this_link[p] = 0
flows_product_in_this_link[p] += round(
flow.product_packing[p] * flow.nb_packing
)
violation_found = True
self.nb_cuts_added += 1
link_idx = self.do_solver.problem.transport_links_to_index[link]
nb_trips_var = self.do_solver.variables["nb_trips"][link_idx]
# Calculate the total volume in the candidate solution for this link
candidate_volume = sum(
p.size
* round(flow.product_packing.get(p, 0) * flow.nb_packing)
for p in self.do_solver.problem.products
for flow in candidate_solution.list_flows
if flow.transport_link == link
)
if candidate_volume < 1e-6:
continue # Avoid division by zero
# Define the total_volume_var as a Gurobi linear expression
total_volume_var = gurobipy.quicksum(
p.size * self.do_solver.variables["flows"][link_idx][p_idx]
for p_idx, p in enumerate(self.do_solver.problem.products)
if p_idx in self.do_solver.variables["flows"][link_idx]
and p in flows_product_in_this_link
)
# Add the valid, flow-dependent cut
# nb_trips >= (N_greedy / V_candidate) * total_volume_var
cut_coefficient = greedy_trip_count / candidate_volume
model.cbLazy(nb_trips_var >= cut_coefficient * total_volume_var)
other_link = [
index_link
for index_link in range(
self.do_solver.problem.nb_transport_links
)
if self.do_solver.problem.transport_links[
index_link
].transport_type
== link.transport_type
]
for ot in other_link:
total_volume_var = gurobipy.quicksum(
p.size * self.do_solver.variables["flows"][ot][p_idx]
for p_idx, p in enumerate(
self.do_solver.problem.products
)
if p_idx in self.do_solver.variables["flows"][ot]
and p in flows_product_in_this_link
)
nb_trips_var = self.do_solver.variables["nb_trips"][ot]
model.cbLazy(
nb_trips_var >= cut_coefficient * total_volume_var
)
logger.info(
f"CUT #{self.nb_cuts_added} on link {link.id}: "
f"MILP trips={milp_trip_count}, Greedy trips={greedy_trip_count},"
f" Volume={candidate_volume:.2f}. "
f"Adding lazy cut."
)
# 4. If no violation was found, the solution is valid from a packing perspective.
if True:
fit = self.do_solver.aggreg_from_sol(packed_solution)
self.res.append((packed_solution, fit))
self.nb_solutions += 1
stopping = self.callback.on_step_end(
step=self.nb_solutions, res=self.res, solver=self.do_solver
)
if stopping:
model.terminate()
except Exception as e:
logger.error(f"Error in Gurobi lazy callback: {e}", exc_info=True)
self.do_solver.early_stopping_exception = e
model.terminate()