Source code for discrete_optimization.multibatching.solvers.cpsat

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import logging
import math
from collections import Counter, defaultdict
from enum import Enum
from typing import Any, Iterable

from ortools.sat.python import cp_model
from ortools.sat.python.cp_model import CpSolverSolutionCallback

from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Solution,
)
from discrete_optimization.generic_tools.do_solver import (
    WarmstartMixin,
)
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    CategoricalHyperparameter,
    EnumHyperparameter,
    FloatHyperparameter,
)
from discrete_optimization.generic_tools.ortools_cpsat_tools import (
    OrtoolsCpSatSolver,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.multibatching.problem import (
    MultibatchingProblem,
    MultibatchingSolution,
    PackingTransport,
    Product,
    TransportType,
)
from discrete_optimization.multibatching.solvers.solver_utils import (
    precompute_valid_links,
)

logger = logging.getLogger(__name__)


[docs] class ModelingMultiBatch(Enum): FLOW = 0 UNIT_FLOW = 1
[docs] class CpsatMultibatchingSolver(OrtoolsCpSatSolver, WarmstartMixin): hyperparameters = [ EnumHyperparameter( name="modeling", enum=ModelingMultiBatch, default=ModelingMultiBatch.FLOW ), CategoricalHyperparameter( name="detailed_capacity_constraint", choices=[True, False], default=False, depends_on=[("modeling", [ModelingMultiBatch.FLOW])], ), CategoricalHyperparameter( name="add_lb_constraint_nb_trips", choices=[True, False], default=False, depends_on=[("modeling", [ModelingMultiBatch.FLOW])], ), CategoricalHyperparameter( name="restrict_to_shortest_paths", choices=[True, False], default=False ), FloatHyperparameter( name="shortest_path_tolerance", low=0.0, high=30, default=1.0, depends_on=[("restrict_to_shortest_paths", True)], ), CategoricalHyperparameter( name="prevent_incoming_at_source", choices=[True, False] ), ] problem: MultibatchingProblem def __init__( self, problem: MultibatchingProblem, params_objective_function: ParamsObjectiveFunction | None = None, **kwargs: Any, ): super().__init__(problem, params_objective_function, **kwargs) self.variables = {} self.modeling: ModelingMultiBatch = None self.single_batching: bool = None self.verbose_logs = kwargs.get("verbose", 0) self.scaling_factor = kwargs.get("scaling_factor", 1)
[docs] def init_model(self, single_batching: bool = False, **args: Any) -> None: args = self.complete_with_default_hyperparameters(args) modeling = args["modeling"] if modeling == ModelingMultiBatch.FLOW: if single_batching: self.init_model_flows_single_batching(**args) else: self.init_model_flows(**args) if modeling == ModelingMultiBatch.UNIT_FLOW: self.init_model_detailed_trips(**args) self.modeling = modeling self.single_batching = single_batching
[docs] def init_model_flows(self, **args: Any) -> None: use_shortest_path = args["restrict_to_shortest_paths"] sp_tolerance = args["shortest_path_tolerance"] no_incoming_at_source = args["prevent_incoming_at_source"] supply_per_product = { p: self.problem.get_total_supply(p) for p in self.problem.products } demand_per_product = { p: self.problem.get_total_demand(p) for p in self.problem.products } total_size_per_product = { p: p.size * supply_per_product[p] for p in self.problem.products } total_size_in_problem = sum(total_size_per_product.values()) max_nb_trip = max( 1, int( math.ceil( total_size_in_problem / min( [ tt.capacity for tt in self.problem.transport_types if tt.capacity > 0 ] ) ) ), ) transport_type_to_set_products: dict[TransportType, set[Product]] = { tt: set() for tt in self.problem.transport_types } for p in self.problem.products: for tt in p.valid_transports: transport_type_to_set_products[tt].add(p) max_nb_trips_per_transport_type = { tt: sum( [total_size_per_product[p] for p in transport_type_to_set_products[tt]] ) / max(1, tt.capacity) for tt in transport_type_to_set_products } valid_links_map = None if use_shortest_path: logger.info( f"Computing valid links (Shortest Path Heuristic, tol={sp_tolerance})..." ) valid_links_map = precompute_valid_links( self.problem, tolerance=sp_tolerance ) model = cp_model.CpModel() transport_links = self.problem.transport_links flows_variables = {} nb_trips_per_link = {} keys_inflow_location_product = dict() keys_outflow_location_product = dict() for index_tl in range(self.problem.nb_transport_links): flows_variables[index_tl] = {} tl = transport_links[index_tl] loc1 = tl.location_l1 loc2 = tl.location_l2 if loc1.id not in keys_outflow_location_product: keys_outflow_location_product[loc1.id] = defaultdict(lambda: set()) if loc2.id not in keys_inflow_location_product: keys_inflow_location_product[loc2.id] = defaultdict(lambda: set()) tr: TransportType = tl.transport_type nb_trips_per_link[index_tl] = model.NewIntVar( lb=0, ub=max_nb_trip, name=f"num_trips_{index_tl}" ) list_var_weight = [] for index_product in range(self.problem.nb_products): product: Product = self.problem.products[index_product] # Heuristic A: Skip if link is not on a shortest path if use_shortest_path and valid_links_map: if index_tl not in valid_links_map.get(product.id, set()): continue # Heuristic B: Skip if destination is a Producer (NetSupply > 0) # "On node that produce product, you should not have incoming flows" if no_incoming_at_source: if loc2.net_supply.get(product, 0) > 0: continue max_flow_product = max( supply_per_product[product], demand_per_product[product] ) # Create flow of product when the current transport type is valid for this product. if tr in product.valid_transports: if tr.capacity > product.size: flows_variables[index_tl][index_product] = model.NewIntVar( lb=0, ub=max_flow_product, name=f"flow_{index_tl}_{index_product}", ) keys_outflow_location_product[loc1.id][product.id].add( (index_tl, index_product) ) keys_inflow_location_product[loc2.id][product.id].add( (index_tl, index_product) ) list_var_weight.append( ( flows_variables[index_tl][index_product], int(product.size), ) ) # Global capacity constraint model.Add( cp_model.LinearExpr.weighted_sum( [x[0] for x in list_var_weight], [x[1] for x in list_var_weight] ) <= nb_trips_per_link[index_tl] * tl.transport_type.capacity ) if args["detailed_capacity_constraint"]: nb_trips_min_for_product_flows = { index_product: self.compute_nb_trips_min( self.problem.products[index_product], demand_of_product=demand_per_product[ self.problem.products[index_product] ], ) for index_product in range(self.problem.nb_products) } for index_tl in flows_variables: transport_link = self.problem.transport_links[index_tl] transport_type = transport_link.transport_type for index_product in flows_variables[index_tl]: nb_trips_min = nb_trips_min_for_product_flows[index_product][ transport_type ] # binary_indicators = [model.NewBoolVar(f'flow_{index_tl}_{index_product}_equal_{k}') # for k in range(len(nb_trips_min))] # model.add_map_domain(flows_variables[index_tl][index_product], # binary_indicators) # for j in range(1, len(binary_indicators)): # (model.add(nb_trips_per_link[index_tl] >= nb_trips_min[j]) # .OnlyEnforceIf(binary_indicators[j])) # For a given number of product flow NB, we force nb_trips_per_link to be >= nb_trips_min[NB] model.AddForbiddenAssignments( [ flows_variables[index_tl][index_product], nb_trips_per_link[index_tl], ], [ (j, nb_min) for j in range(1, min(5, len(nb_trips_min))) for nb_min in range(nb_trips_min[j]) ], ) if args["add_lb_constraint_nb_trips"]: self.add_advanced_capacity_constraints( model=model, flows_variables=flows_variables, nb_trips_per_link=nb_trips_per_link, solver_type="cpsat", ) self.add_advanced_capacity_constraints2( model=model, flows_variables=flows_variables, nb_trips_per_link=nb_trips_per_link, max_k=5, solver_type="cpsat", ) # self.add_global_flow_limit_constraints(model, flows_variables, "cpsat", 3) # Flows constraint for location in self.problem.locations: loc_id = location.id index_products = range(len(self.problem.products)) for index_product in index_products: product = self.problem.products[index_product] net_supply = location.net_supply.get(product, 0) # Linear expressions (not constraint) flow_in = cp_model.LinearExpr.Sum( [ flows_variables[x[0]][x[1]] for x in keys_inflow_location_product.get(loc_id, {}).get( product.id, set() ) ] ) # flow_in = sum([flows_variables[x[0]][x[1]] # for x in keys_inflow_location_product.get(loc_id, {}).get(product.id, set())]) flow_out = cp_model.LinearExpr.Sum( [ flows_variables[x[0]][x[1]] for x in keys_outflow_location_product.get(loc_id, {}).get( product.id, set() ) ] ) model.Add(flow_in + net_supply - flow_out == 0) transport_cost = self.scaling_factor * sum( [ nb_trips_per_link[index_tl] * int( transport_links[index_tl].distance * transport_links[index_tl].transport_type.cost ) for index_tl in range(self.problem.nb_transport_links) ] ) emission_cost = self.scaling_factor * sum( [ nb_trips_per_link[index_tl] * int( transport_links[index_tl].distance * transport_links[index_tl].transport_type.emissions ) for index_tl in range(self.problem.nb_transport_links) ] ) self.variables["total_obj"] = transport_cost + emission_cost model.Minimize(self.variables["total_obj"]) self.variables["objs"] = { "transport": transport_cost, "emission": emission_cost, } self.cp_model = model self.variables["flows"] = flows_variables self.variables["nb_trips"] = nb_trips_per_link
[docs] def init_model_flows_single_batching(self, **args: Any) -> None: supply_per_product = { p: self.problem.get_total_supply(p) for p in self.problem.products } demand_per_product = { p: self.problem.get_total_demand(p) for p in self.problem.products } total_size_per_product = { p: p.size * supply_per_product[p] for p in self.problem.products } total_size_in_problem = sum(total_size_per_product.values()) max_nb_trip = max( 1, int( total_size_in_problem / min( [ tt.capacity for tt in self.problem.transport_types if tt.capacity > 0 ] ) ), ) model = cp_model.CpModel() self.cp_model = model transport_links = self.problem.transport_links flows_variables = {} # flows_nz_variables = {} nb_trips_per_link = {} nb_trips_per_link_per_product = {} keys_inflow_location_product = dict() keys_outflow_location_product = dict() for index_tl in range(self.problem.nb_transport_links): flows_variables[index_tl] = {} # flows_nz_variables[index_tl] = {} tl = transport_links[index_tl] loc1 = tl.location_l1 loc2 = tl.location_l2 if loc1.id not in keys_outflow_location_product: keys_outflow_location_product[loc1.id] = defaultdict(lambda: set()) if loc2.id not in keys_inflow_location_product: keys_inflow_location_product[loc2.id] = defaultdict(lambda: set()) tr: TransportType = tl.transport_type nb_trips_per_link[index_tl] = model.NewIntVar( lb=0, ub=max_nb_trip, name=f"num_trips_{index_tl}" ) list_var_weight = [] keys_nb_trips_per_link_per_product = [] # for index_product in range(self.problem.nb_products): for index_product in range(self.problem.nb_products): product = self.problem.products[index_product] max_flow_product = max( supply_per_product[product], demand_per_product[product] ) if tr in product.valid_transports: if tr.capacity > product.size: flows_variables[index_tl][index_product] = model.NewIntVar( lb=0, ub=max_flow_product, name=f"flow_{index_tl}_{index_product}", ) # flows_nz_variables[index_tl][index_product] = model.NewBoolVar(name=f"nz_flow_{index_tl}_" # f"{index_product}") nb_product_per_transport = int(tr.capacity // product.size) capa_used_per_transport = ( nb_product_per_transport * product.size ) nb_trips_max = int( math.ceil(max_flow_product / nb_product_per_transport) ) nb_trips_per_link_per_product[(index_tl, index_product)] = ( model.NewIntVar( lb=0, ub=nb_trips_max, name=f"trips_{index_tl}_{index_product}", ) ) keys_nb_trips_per_link_per_product.append( (index_tl, index_product) ) self.cp_model.add( nb_trips_per_link_per_product[(index_tl, index_product)] * nb_product_per_transport >= flows_variables[index_tl][index_product] ) keys_outflow_location_product[loc1.id][product.id].add( (index_tl, index_product) ) keys_inflow_location_product[loc2.id][product.id].add( (index_tl, index_product) ) list_var_weight.append( ( flows_variables[index_tl][index_product], int(product.size), ) ) model.Add( sum( [ nb_trips_per_link_per_product[x] for x in keys_nb_trips_per_link_per_product ] ) == nb_trips_per_link[index_tl] ) # Global capacity constraint model.Add( cp_model.LinearExpr.weighted_sum( [x[0] for x in list_var_weight], [x[1] for x in list_var_weight] ) <= nb_trips_per_link[index_tl] * tl.transport_type.capacity ) # Flows constraint for location in self.problem.locations: loc_id = location.id debug_mode = args.get("debug_mode", False) index_products = args.get("debug_settings", {}).get( "index_products", range(len(self.problem.products)) ) for index_product in index_products: product = self.problem.products[index_product] net_supply = location.net_supply.get(product, 0) flow_in = cp_model.LinearExpr.Sum( [ flows_variables[x[0]][x[1]] for x in keys_inflow_location_product.get(loc_id, {}).get( product.id, set() ) ] ) flow_out = cp_model.LinearExpr.Sum( [ flows_variables[x[0]][x[1]] for x in keys_outflow_location_product.get(loc_id, {}).get( product.id, set() ) ] ) model.Add(flow_in + net_supply - flow_out == 0) transport_cost = self.scaling_factor * sum( [ nb_trips_per_link[index_tl] * int( transport_links[index_tl].distance * transport_links[index_tl].transport_type.cost ) for index_tl in range(self.problem.nb_transport_links) ] ) emission_cost = self.scaling_factor * sum( [ nb_trips_per_link[index_tl] * int( transport_links[index_tl].distance * transport_links[index_tl].transport_type.emissions ) for index_tl in range(self.problem.nb_transport_links) ] ) estimated_compound = [] for index_tl in range(self.problem.nb_transport_links): tl = self.problem.transport_links[index_tl] time = tl.distance / tl.transport_type.speed value = sum( flows_variables[index_tl][index_product] * int(self.problem.products[index_product].value) for index_product in flows_variables[index_tl] ) estimated_compound.append( value * int( self.scaling_factor * ((1 + self.problem.capital_factor) ** time - 1) ) ) self.variables["total_obj"] = ( transport_cost + emission_cost + sum(estimated_compound) ) model.Minimize(self.variables["total_obj"]) self.variables["objs"] = { "transport": transport_cost, "emission": emission_cost, "capital": sum(estimated_compound), } self.variables["flows"] = flows_variables self.variables["nb_trips"] = nb_trips_per_link self.variables["nb_trips_per_link_per_product"] = nb_trips_per_link_per_product
[docs] def init_model_detailed_trips(self, **args: Any) -> None: sol: MultibatchingSolution = args.get("solution", None) delta: int = args.get("delta_to_solution", 1) max_nb_trips_per_link = args.get("max_trips", 50) logger.info(f"Max trips : {max_nb_trips_per_link}") max_nb_trip_per_transport_link = None if sol is not None: max_nb_trip_per_transport_link = defaultdict(lambda: delta) for pt in sol.list_flows: id_tl = self.problem.transport_links_to_index[pt.transport_link] max_nb_trip_per_transport_link[id_tl] += pt.nb_packing supply_per_product = { p: self.problem.get_total_supply(p) for p in self.problem.products } total_size_per_product = { p: p.size * supply_per_product[p] for p in self.problem.products } model = cp_model.CpModel() transport_links = self.problem.transport_links flows_variables = {} used_trips_variables = {} trips_variables = {} nb_trips_per_link = {} keys_inflow_location_product = dict() keys_outflow_location_product = dict() for index_tl in range(self.problem.nb_transport_links): flows_variables[index_tl] = {} trips_variables[index_tl] = {} used_trips_variables[index_tl] = {} tl = transport_links[index_tl] loc1 = tl.location_l1 loc2 = tl.location_l2 if loc1.id not in keys_outflow_location_product: keys_outflow_location_product[loc1.id] = defaultdict(lambda: set()) if loc2.id not in keys_inflow_location_product: keys_inflow_location_product[loc2.id] = defaultdict(lambda: set()) tr: TransportType = tl.transport_type max_trips_for_this_link = tl.max_trips if max_trips_for_this_link is None: max_trips_for_this_link = max_nb_trips_per_link if max_nb_trip_per_transport_link is not None: max_trips_for_this_link = max_nb_trip_per_transport_link[index_tl] nb_trips_per_link[index_tl] = model.NewIntVar( lb=0, ub=max_trips_for_this_link, name=f"num_trips_{index_tl}" ) list_var_weight = [] for nb_trip in range(max_trips_for_this_link): trips_variables[index_tl][nb_trip] = {} # means "nb_trip-th" vehicle is used. used_trips_variables[index_tl][nb_trip] = model.NewBoolVar( name=f"used_{index_tl}_{nb_trip}" ) # Symmetry breaking for i in range(1, max_trips_for_this_link): # ensure that if used_trip[i] == 0 then used_trip[i+1] == 0 model.Add( used_trips_variables[index_tl][i] <= used_trips_variables[index_tl][i - 1] ) for index_product in range(self.problem.nb_products): product = self.problem.products[index_product] if tr in product.valid_transports: flows_variables[index_tl][index_product] = model.NewIntVar( lb=0, ub=100, name=f"flow_{index_tl}_{index_product}" ) keys_outflow_location_product[loc1.id][product.id].add( (index_tl, index_product) ) keys_inflow_location_product[loc2.id][product.id].add( (index_tl, index_product) ) list_var_weight.append( (flows_variables[index_tl][index_product], int(product.size)) ) for nb_trip in trips_variables[index_tl]: trips_variables[index_tl][nb_trip][index_product] = ( model.NewIntVar( lb=0, ub=math.floor( tl.transport_type.capacity / product.size ), name=f"flow_{index_tl}_{nb_trip}_{index_product}", ) ) # Total flows of product in this edge == # the sum of transport product on individual trips on this edge. model.Add( sum( [ trips_variables[index_tl][nb_trip][index_product] for nb_trip in trips_variables[index_tl] ] ) == flows_variables[index_tl][index_product] ) for i in used_trips_variables[index_tl]: model.Add( sum( [ trips_variables[index_tl][i][p] for p in trips_variables[index_tl][i] ] ) > 0 ).OnlyEnforceIf(used_trips_variables[index_tl][i]) model.Add( sum( [ trips_variables[index_tl][i][p] for p in trips_variables[index_tl][i] ] ) == 0 ).OnlyEnforceIf(used_trips_variables[index_tl][i].Not()) ( model.Add( sum( [ trips_variables[index_tl][i][p] * int(self.problem.products[p].size) for p in trips_variables[index_tl][i] ] ) <= tl.transport_type.capacity ).OnlyEnforceIf(used_trips_variables[index_tl][i]) ) # Global capacity constraint model.Add( cp_model.LinearExpr.weighted_sum( [x[0] for x in list_var_weight], [x[1] for x in list_var_weight] ) <= nb_trips_per_link[index_tl] * tl.transport_type.capacity ) model.Add( nb_trips_per_link[index_tl] == sum( used_trips_variables[index_tl][i] for i in used_trips_variables[index_tl] ) ) # Flows constraint for location in self.problem.locations: loc_id = location.id for product in self.problem.products: net_supply = location.net_supply.get(product, 0) flow_in = cp_model.LinearExpr.Sum( [ flows_variables[x[0]][x[1]] for x in keys_inflow_location_product.get(loc_id, {}).get( product.id, set() ) ] ) flow_out = cp_model.LinearExpr.Sum( [ flows_variables[x[0]][x[1]] for x in keys_outflow_location_product.get(loc_id, {}).get( product.id, set() ) ] ) model.Add(flow_in + net_supply - flow_out == 0) transport_cost = self.scaling_factor * sum( [ nb_trips_per_link[index_tl] * transport_links[index_tl].distance * transport_links[index_tl].transport_type.cost for index_tl in range(self.problem.nb_transport_links) ] ) emission_cost = self.scaling_factor * sum( [ nb_trips_per_link[index_tl] * transport_links[index_tl].distance * transport_links[index_tl].transport_type.emissions for index_tl in range(self.problem.nb_transport_links) ] ) nb_trips_cost = sum( [ used_trips_variables[index_tl][nb_trip] for index_tl in used_trips_variables for nb_trip in used_trips_variables[index_tl] ] ) self.variables["total_obj"] = transport_cost + emission_cost model.Minimize(self.variables["total_obj"]) self.cp_model = model self.variables["flows"] = flows_variables self.variables["nb_trips"] = nb_trips_per_link self.variables["used_trips"] = used_trips_variables self.variables["trips_variables"] = trips_variables
[docs] def compute_nb_trips_min(self, product: Product, demand_of_product: int): nb_trips_min_for_product_flows = {} size_product = product.size for tl in range(self.problem.nb_transport_types): tt: TransportType = self.problem.transport_types[tl] capa_transport = tt.capacity nb_trips_min_for_product_flows[tt] = [0] current_greedy_bins = defaultdict(lambda: 0) current_bin = 0 current_greedy_bins[0] = 0 for i in range(demand_of_product): if current_greedy_bins[current_bin] + size_product <= capa_transport: current_greedy_bins[current_bin] += size_product else: current_bin += 1 current_greedy_bins[current_bin] = size_product nb_trips_min_for_product_flows[tt].append(current_bin + 1) return nb_trips_min_for_product_flows
[docs] def set_warm_start_from_prev_solve(self): if self.solver is not None: self.cp_model.ClearHints() response = self.solver.ResponseProto() # Get the raw response for i in range(len(response.solution)): var = self.cp_model.GetIntVarFromProtoIndex(i) # print(f"Variable {var} = {response.solution[i]}") self.cp_model.AddHint(var, response.solution[i])
[docs] def set_warm_start(self, solution: MultibatchingSolution) -> None: if self.modeling == ModelingMultiBatch.FLOW: self.set_warm_start_flow(solution=solution) if self.modeling == ModelingMultiBatch.UNIT_FLOW: self.set_warm_start_unit_flow(solution=solution)
[docs] def set_warm_start_unit_flow(self, solution: MultibatchingSolution) -> None: """ Provides a warm start for the UNIT_FLOW modeling by hinting individual trip contents. """ self.cp_model.ClearHints() # Group all trips by transport link trips_per_link = defaultdict(list) for flow in solution.list_flows: # "Unroll" the aggregated solution: if nb_packing is 5, add the packing 5 times for _ in range(flow.nb_packing): trips_per_link[flow.transport_link].append(flow.product_packing) # Now, assign the actual trips from the solution to the model variables done_link_idx = set() for link, trips in trips_per_link.items(): sum_per_product = defaultdict(lambda: 0) link_idx = self.problem.transport_links_to_index[link] max_trips_for_link = len(self.variables["used_trips"][link_idx]) done_link_idx.add(link_idx) # Iterate through the unrolled trips and assign them one by one for trip_idx, packing_content in enumerate(trips): if trip_idx >= max_trips_for_link: logger.warning( f"Solution has more trips ({len(trips)}) on link {link.id} " f"than available in the model ({max_trips_for_link}). " f"Hint will be truncated." ) break self.cp_model.AddHint( self.variables["used_trips"][link_idx][trip_idx], 1 ) for product, units in packing_content.items(): product_idx = self.problem.product_to_index[product] if ( product_idx in self.variables["trips_variables"][link_idx][trip_idx] ): content_var = self.variables["trips_variables"][link_idx][ trip_idx ][product_idx] self.cp_model.AddHint(content_var, int(units)) sum_per_product[product_idx] += int(units) for p in self.variables["trips_variables"][link_idx][trip_idx]: if self.problem.products[p] not in packing_content: self.cp_model.AddHint( self.variables["trips_variables"][link_idx][trip_idx][p], 0 ) for product in self.variables["flows"][link_idx]: self.cp_model.AddHint( self.variables["flows"][link_idx][product], sum_per_product[product] ) self.cp_model.AddHint(self.variables["nb_trips"][link_idx], len(trips)) for k in range(len(trips), max_trips_for_link): self.cp_model.AddHint(self.variables["used_trips"][link_idx][k], 0) for product_idx in self.variables["trips_variables"][link_idx][k]: self.cp_model.AddHint( self.variables["trips_variables"][link_idx][k][product_idx], 0 ) for index_link in self.variables["used_trips"]: if index_link not in done_link_idx: max_trips_for_link = len(self.variables["used_trips"][index_link]) self.cp_model.AddHint(self.variables["nb_trips"][index_link], 0) for prod in self.variables["flows"][index_link]: self.cp_model.AddHint(self.variables["flows"][index_link][prod], 0) for k in range(max_trips_for_link): self.cp_model.AddHint( self.variables["used_trips"][index_link][k], 0 ) for product_idx in self.variables["trips_variables"][index_link][k]: self.cp_model.AddHint( self.variables["trips_variables"][index_link][k][ product_idx ], 0, ) logger.info("Warm start hints added for UNIT_FLOW model.")
[docs] def set_warm_start_flow(self, solution: MultibatchingSolution) -> None: aggreg_flows_solution = {} nb_trips = {} self.cp_model.ClearHints() for flow in solution.list_flows: ind = self.problem.transport_links_to_index[flow.transport_link] if ind not in aggreg_flows_solution: aggreg_flows_solution[ind] = defaultdict(lambda: 0) nb_trips[ind] = 0 for p in flow.product_packing: aggreg_flows_solution[ind][self.problem.product_to_index[p]] += int( flow.product_packing[p] * flow.nb_packing ) nb_trips[ind] += flow.nb_packing for index_link in self.variables["flows"]: for index_product in self.variables["flows"][index_link]: self.cp_model.AddHint( self.variables["flows"][index_link][index_product], aggreg_flows_solution.get(index_link, {}).get(index_product, 0), ) self.cp_model.AddHint( self.variables["nb_trips"][index_link], nb_trips.get(index_link, 0) ) print("ws done")
[docs] def logs_solution(self, cpsolvercb: CpSolverSolutionCallback) -> Solution: if "objs" in self.variables: for obj in self.variables["objs"]: logger.info(f"{obj}: {cpsolvercb.value(self.variables['objs'][obj])}") for index_tl in self.variables["nb_trips"]: tl = self.problem.transport_links[index_tl] nb_trips = cpsolvercb.value(self.variables["nb_trips"][index_tl]) if nb_trips > 0: logger.info( f"Nb trips on {tl.location_l1, tl.location_l2, tl.transport_type.id}, :" f"{cpsolvercb.value(self.variables['nb_trips'][index_tl])}" ) for index_product in self.variables["flows"][index_tl]: val = cpsolvercb.value( self.variables["flows"][index_tl][index_product] ) if val > 0: logger.info( f"Nb product {self.problem.products[index_product].id} " f"on {tl.location_l1, tl.location_l2, tl.transport_type.id}, :" f"{cpsolvercb.value(self.variables['flows'][index_tl][index_product])}" ) if self.modeling == ModelingMultiBatch.UNIT_FLOW: for i in self.variables["trips_variables"][index_tl]: used = self.variables["used_trips"][index_tl][i] if cpsolvercb.value(used) == 1: logger.info( f"{i}-th trip used on route {tl.location_l1, tl.location_l2, tl.transport_type.id}" ) for index_product in self.variables["trips_variables"][ index_tl ][i]: val = cpsolvercb.value( self.variables["trips_variables"][index_tl][i][ index_product ] ) if val > 0: logger.info( f"Nb product {self.problem.products[index_product].id} " f"on {tl.location_l1, tl.location_l2, tl.transport_type.id}, :" f"{val}" ) logger.info(f"------") if nb_trips > 0: logger.info(f"------End of transport link------")
[docs] def retrieve_solution(self, cpsolvercb: CpSolverSolutionCallback) -> Solution: if self.verbose_logs > 0: self.logs_solution(cpsolvercb) packing_solution = [] products = self.problem.products for index_tl in self.variables["flows"]: nb = cpsolvercb.value(self.variables["nb_trips"][index_tl]) if nb > 0: packings = [] if self.modeling == ModelingMultiBatch.UNIT_FLOW: for i in sorted(self.variables["trips_variables"][index_tl]): used = self.variables["used_trips"][index_tl][i] if cpsolvercb.value(used) > 0: current_packing = set() for index_product in self.variables["trips_variables"][ index_tl ][i]: val = cpsolvercb.value( self.variables["trips_variables"][index_tl][i][ index_product ] ) if val > 0: current_packing.add((index_product, int(val))) cpack = frozenset(current_packing) packings.append(cpack) if len(packings) > 0: c = Counter(packings) for key, val in c.items(): packing_transport = PackingTransport( transport_link=self.problem.transport_links[index_tl], product_packing={products[x[0]]: x[1] for x in key}, nb_packing=val, ) packing_solution.append(packing_transport) else: cur_packing = {} if not self.single_batching: for index_product in self.variables["flows"][index_tl]: val = cpsolvercb.value( self.variables["flows"][index_tl][index_product] ) if val > 0: cur_packing[products[index_product]] = val / nb if len(cur_packing) > 0: packing_solution.append( PackingTransport( transport_link=self.problem.transport_links[ index_tl ], product_packing=cur_packing, nb_packing=nb, ) ) else: for index_product in self.variables["flows"][index_tl]: val = cpsolvercb.value( self.variables["flows"][index_tl][index_product] ) if val > 0: p = self.problem.products[index_product] tl = self.problem.transport_links[index_tl] tr = tl.transport_type nb_trips_per_link_per_product = cpsolvercb.value( self.variables["nb_trips_per_link_per_product"][ (index_tl, index_product) ] ) nb_product_per_transport = ( tr.capacity // self.problem.products[index_product].size ) nb_trips_per_link_per_product = math.ceil( val / nb_product_per_transport ) if nb_trips_per_link_per_product > 1: if val % nb_product_per_transport == 0: packings = [ PackingTransport( transport_link=tl, product_packing={ p: nb_product_per_transport }, nb_packing=nb_trips_per_link_per_product, ) ] else: packings = [ PackingTransport( transport_link=tl, product_packing={ p: nb_product_per_transport }, nb_packing=nb_trips_per_link_per_product - 1, ) ] + [ PackingTransport( transport_link=tl, product_packing={ p: val % nb_product_per_transport }, nb_packing=1, ) ] else: packings = [ PackingTransport( tl, product_packing={p: val}, nb_packing=1 ) ] if ( sum( [ pack.product_packing[p] * pack.nb_packing for pack in packings ] ) != val ): print( val, sum( [ pack.product_packing[p] * pack.nb_packing for pack in packings ] ), ) print( "Nb product per transport", nb_product_per_transport, val, nb_trips_per_link_per_product, ) print(packings) print("Problem") packing_solution.extend(packings) sol = MultibatchingSolution(problem=self.problem, list_flows=packing_solution) return sol
[docs] def add_advanced_capacity_constraints( self, model, flows_variables, nb_trips_per_link, solver_type="cpsat" ): """ Adds improved lower bounds on the number of trips based on bin packing dual feasible functions. Adapted from the DP bounds for SALBP/BinPacking. """ # Import relevant library for summation based on solver type if solver_type == "gurobi": import gurobipy sum_func = gurobipy.quicksum else: # For CP-SAT, sum() works with LinearExpr sum_func = sum for index_tl in range(self.problem.nb_transport_links): # Skip if variables not defined for this link if index_tl not in nb_trips_per_link: continue tl = self.problem.transport_links[index_tl] capacity = tl.transport_type.capacity if capacity <= 0: continue nb_trips_var = nb_trips_per_link[index_tl] # --- Bound 2 (L2): Items related to C/2 --- # Logic: Items > C/2 take a full bin. Items = C/2 take half a bin. # Formula: 2 * NbTrips >= 2 * sum(Flow(>C/2)) + 1 * sum(Flow(=C/2)) expr_2 = [] # --- Bound 3 (L3): Items related to C/3 and 2C/3 --- # Logic: # > 2C/3 : weight 1 (takes full bin usually) # = 2C/3 : weight 2/3 # > C/3 : weight 1/2 # = C/3 : weight 1/3 # Scaled by 6 to stay integer: # 6 * NbTrips >= 6*Flow(>2C/3) + 4*Flow(=2C/3) + 3*Flow(>C/3) + 2*Flow(=C/3) expr_3 = [] # Iterate over products existing on this link current_flow_vars = flows_variables[index_tl] for index_product in current_flow_vars: product = self.problem.products[index_product] size = product.size flow_var = current_flow_vars[index_product] # Comparisons using multiplication to avoid float issues # --- For Bound 2 --- if 2 * size > capacity: # size > C/2 expr_2.append(2 * flow_var) elif 2 * size == capacity: # size == C/2 expr_2.append(flow_var) # --- For Bound 3 --- if 3 * size > 2 * capacity: # size > 2C/3 expr_3.append(6 * flow_var) elif 3 * size == 2 * capacity: # size == 2C/3 expr_3.append(4 * flow_var) elif 3 * size > capacity: # size > C/3 expr_3.append(3 * flow_var) elif 3 * size == capacity: # size == C/3 expr_3.append(2 * flow_var) # Add constraints if applicable if len(expr_2) > 0: if solver_type == "gurobi": model.addLConstr(2 * nb_trips_var >= sum_func(expr_2)) else: model.Add(2 * nb_trips_var >= sum_func(expr_2)) if len(expr_3) > 0: if solver_type == "gurobi": model.addLConstr(6 * nb_trips_var >= sum_func(expr_3)) else: model.Add(6 * nb_trips_var >= sum_func(expr_3))
[docs] def add_advanced_capacity_constraints2( self, model, flows_variables, nb_trips_per_link, solver_type="cpsat", max_k=30 ): """ Adds generalized lower bound constraints on the number of trips. For each k in [2..max_k], we consider items with size > Capacity/k. Since at most k-1 such items fit in one vehicle, we add the cut: (k-1) * NbTrips >= Sum(Flows of these items) """ # Select summation function based on solver if solver_type == "gurobi": import gurobipy sum_func = gurobipy.quicksum else: sum_func = sum for index_tl in range(self.problem.nb_transport_links): if index_tl not in nb_trips_per_link: continue tl = self.problem.transport_links[index_tl] capacity = tl.transport_type.capacity if capacity <= 0: continue nb_trips_var = nb_trips_per_link[index_tl] current_flow_vars = flows_variables[index_tl] # Iterate k from 2 up to max_k (e.g., items > C/2, > C/3, ..., > C/10) for k in range(2, max_k + 1): relevant_flows = [] for index_product, flow_var in current_flow_vars.items(): product = self.problem.products[index_product] # Check condition: size > Capacity / k <==> size * k > Capacity if product.size * k > capacity: relevant_flows.append(flow_var) if relevant_flows: # Constraint: (k-1) * NbTrips >= Sum(Relevant Flows) lhs = (k - 1) * nb_trips_var rhs = sum_func(relevant_flows) if solver_type == "gurobi": model.addLConstr(lhs >= rhs) else: # CP-SAT model.Add(lhs >= rhs)
[docs] def add_global_flow_limit_constraints( self, model, flows_variables, solver_type="cpsat", factor=2 ): """ Limits the total volume of flow for each product across the entire network. Constraint: Sum(Flows_of_Product_P) <= factor * TotalDemand_of_Product_P - A factor of 1.0 implies direct shipments only (shortest path in terms of hops). - A factor of 2.0 implies that, on average, each unit can traverse 2 links (1 transshipment). - This prevents excessive detours without using binary variables. """ if factor is None: return # Select summation function based on solver if solver_type == "gurobi": import gurobipy sum_func = gurobipy.quicksum else: sum_func = sum for index_product in range(self.problem.nb_products): product = self.problem.products[index_product] # 1. Compute Total Demand (sum of positive requirements) for this product total_demand = self.problem.get_total_demand(product) # If there is no demand, no flow should happen (handled by conservation, but safe to skip) if total_demand <= 0: continue # 2. Collect all flow variables for this product across all links product_flows = [] for index_tl in flows_variables: if index_product in flows_variables[index_tl]: product_flows.append(flows_variables[index_tl][index_product]) if not product_flows: continue # 3. Add Constraint # We scale the demand by the factor (e.g., 1.5 * 100 = 150 max flow volume) limit = factor * total_demand if solver_type == "gurobi": model.addLConstr( sum_func(product_flows) <= limit, name=f"global_flow_limit_{index_product}", ) else: # CP-SAT requires integer bounds model.Add(sum_func(product_flows) <= int(limit))
[docs] def implements_lexico_api(self) -> bool: return True
[docs] def get_lexico_objectives_available(self) -> list[str]: return list(self.variables["objs"].keys())
[docs] def set_lexico_objective(self, obj: str) -> None: self.cp_model.clear_objective() self.cp_model.minimize(self.variables["objs"][obj])
[docs] def add_lexico_constraint(self, obj: str, value: float) -> Iterable[Any]: return [self.cp_model.add(self.variables["objs"][obj] <= value)]
[docs] def get_lexico_objective_value(self, obj: str, res: ResultStorage) -> float: if len(res) > 0: sol: MultibatchingSolution = res[-1][0] if "_intern_obj" in sol.__dict__.keys(): return sol._intern_obj[obj] else: kpis = self.problem.evaluate(sol) return int(self.scaling_factor * kpis[obj]) else: return None