Source code for discrete_optimization.facility.solvers.toulbar

#  Copyright (c) 2024 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import random
from enum import Enum
from typing import Any, Dict, Iterable, Optional, Tuple, Union

import tqdm

try:
    import pytoulbar2

    toulbar_available = True
except ImportError as e:
    toulbar_available = False
import logging

from discrete_optimization.facility.problem import FacilityProblem, FacilitySolution
from discrete_optimization.facility.solvers.facility_solver import FacilitySolver
from discrete_optimization.facility.solvers.lp import prune_search_space
from discrete_optimization.generic_tools.do_solver import SolverDO, WarmstartMixin
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    EnumHyperparameter,
)
from discrete_optimization.generic_tools.lns_tools import ConstraintHandler
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.generic_tools.toulbar_tools import (
    ToulbarSolver,
    to_lns_toulbar,
)

logger = logging.getLogger(__name__)


[docs] class ModelingToulbarFacility(Enum): INTEGER = 0 BINARY = 1
[docs] class ToulbarFacilitySolver(ToulbarSolver, FacilitySolver, WarmstartMixin): hyperparameters = ToulbarSolver.hyperparameters + [ EnumHyperparameter( name="modeling", enum=ModelingToulbarFacility, default=ModelingToulbarFacility.INTEGER, ) ] modeling: ModelingToulbarFacility key_to_index: Optional[dict] used_var_to_index: Optional[dict] client_var_to_index: Optional[dict]
[docs] def init_model(self, **kwargs): kwargs = self.complete_with_default_hyperparameters(kwargs) if kwargs["modeling"] == ModelingToulbarFacility.INTEGER: self.init_model_integer_variable(**kwargs) self.modeling = kwargs["modeling"] if kwargs["modeling"] == ModelingToulbarFacility.BINARY: self.init_model_boolean_variable(**kwargs) self.modeling = kwargs["modeling"]
[docs] def init_model_boolean_variable(self, **kwargs): """ Init a model with boolean variable array, indicating if a factory f is assigned to customer c """ nb_facilities = self.problem.facility_count nb_customers = self.problem.customer_count if "vns" in kwargs: model = pytoulbar2.CFN(kwargs.get("upper_bound", 10e8), vns=kwargs["vns"]) else: model = pytoulbar2.CFN(kwargs.get("upper_bound", 10e8)) x: Dict[Tuple[int, int], Union[int, Any]] = {} key_to_index = {} index = 0 matrix_fc_indicator, matrix_length = prune_search_space( self.problem, n_cheapest=nb_facilities, n_shortest=nb_facilities ) for f in range(nb_facilities): for c in range(nb_customers): x[f, c] = model.AddVariable(name=f"x_{(f,c)}", values=[0, 1]) model.AddFunction([f"x_{(f,c)}"], [0, matrix_length[f, c]]) key_to_index[(f, c)] = index index += 1 for c in range(nb_customers): model.AddSumConstraint( [key_to_index[(f, c)] for f in range(nb_facilities)], operand="==", rightcoef=1, ) used_var = {} index_var = index used_var_to_index = {} for i in range(nb_facilities): used_var[i] = model.AddVariable(f"used_{i}", [0, 1]) used_var_to_index[i] = index_var index_var += 1 model.AddFunction([f"used_{i}"], [0, self.problem.facilities[i].setup_cost]) for c in range(nb_customers): # model.CFN.wcsp.postKnapsackConstraint( # [index_var - 1, key_to_index[(i, c)]], "0 1 1 1 1 1 -1", False, True # ) model.AddLinearConstraint( [1, -1], [f"used_{i}", f"x_{i, c}"], operand=">=", rightcoef=0 ) # constraint triggering the used facility binary variable # Capacity constraint. for i in range(nb_facilities): param_c = "" for c in range(nb_customers): param_c += ( " " + str(1) + " " + str(1) + " " + str(-int(self.problem.customers[c].demand)) ) model.CFN.wcsp.postKnapsackConstraint( [key_to_index[(i, c)] for c in range(nb_customers)], str(-self.problem.facilities[i].capacity) + param_c, False, True, ) # Capacity/Knapsack constraint on facility. self.model = model self.key_to_index = key_to_index self.used_var_to_index = used_var_to_index
[docs] def init_model_integer_variable(self, **kwargs): """ Integer encoding, where main decision variable is an array of integer, giving for each customer what is the facility index. """ nb_facilities = self.problem.facility_count nb_customers = self.problem.customer_count if "vns" in kwargs: model = pytoulbar2.CFN(kwargs.get("upper_bound", 10e8), vns=kwargs["vns"]) else: model = pytoulbar2.CFN(kwargs.get("upper_bound", 10e8)) index = 0 matrix_fc_indicator, matrix_length = prune_search_space( self.problem, n_cheapest=nb_facilities, n_shortest=nb_facilities ) client_var_to_index = {} for c in tqdm.tqdm(range(nb_customers)): model.AddVariable(f"x_{c}", values=range(nb_facilities)) model.AddFunction( [f"x_{c}"], [int(matrix_length[f, c]) for f in range(nb_facilities)] ) client_var_to_index[c] = index index += 1 index_var = index used_var_to_index = {} for i in tqdm.tqdm(range(nb_facilities)): model.AddVariable(f"used_{i}", [0, 1]) used_var_to_index[i] = index_var model.AddFunction([f"used_{i}"], [0, self.problem.facilities[i].setup_cost]) for c in range(nb_customers): model.AddFunction( [f"used_{i}", f"x_{c}"], [ 10e8 if b == 0 and f == i else 0 for b in [0, 1] for f in range(nb_facilities) ], ) index_var += 1 # Force that when x_{c} == i, used_i = 1 # capacity constraint on facility. for i in tqdm.tqdm(range(nb_facilities)): # More low level version of the capacity constraint. # params_constraints = "" # for c in range(nb_customers): # params_constraints += " "+str(1)+" "+str(i)+" "+str(-int(self.problem.customers[c].demand)) # Problem.CFN.wcsp.postKnapsackConstraint([client_var_to_index[c] for c in range(nb_customers)], # str(int(-self.problem.facilities[i].capacity))+params_constraints, # False, True) model.AddGeneralizedLinearConstraint( [ (f"x_{c}", i, int(self.problem.customers[c].demand)) for c in range(nb_customers) ], operand="<=", rightcoef=int(self.problem.facilities[i].capacity), ) self.model = model self.used_var_to_index = used_var_to_index self.client_var_to_index = client_var_to_index
[docs] def retrieve_solution(self, solution) -> FacilitySolution: if self.modeling == ModelingToulbarFacility.BINARY: return self.retrieve_solution_binary(solution) if self.modeling == ModelingToulbarFacility.INTEGER: return self.retrieve_solution_integer(solution)
[docs] def retrieve_solution_binary(self, solution) -> FacilitySolution: sol = FacilitySolution( problem=self.problem, facility_for_customers=[None] * self.problem.customer_count, ) for x in self.key_to_index: index = self.key_to_index[x] if solution[0][index] == 1: sol.facility_for_customers[x[1]] = x[0] return sol
[docs] def retrieve_solution_integer(self, solution) -> FacilitySolution: return FacilitySolution( problem=self.problem, facility_for_customers=solution[0][: self.problem.customer_count], )
[docs] def set_warm_start(self, solution: FacilitySolution) -> None: if self.modeling == ModelingToulbarFacility.INTEGER: self.set_warm_start_integer(solution) if self.modeling == ModelingToulbarFacility.BINARY: self.set_warm_start_binary(solution)
[docs] def set_warm_start_binary(self, solution: FacilitySolution) -> None: for f, c in self.key_to_index: if solution.facility_for_customers[c] == f: self.model.CFN.wcsp.setBestValue(self.key_to_index[f, c], 1) else: self.model.CFN.wcsp.setBestValue(self.key_to_index[f, c], 0) facility_used = set(solution.facility_for_customers) for i in self.used_var_to_index: if i in facility_used: self.model.CFN.wcsp.setBestValue(self.used_var_to_index[i], 1) else: self.model.CFN.wcsp.setBestValue(self.used_var_to_index[i], 0)
[docs] def set_warm_start_integer(self, solution: FacilitySolution) -> None: for c in self.client_var_to_index: self.model.CFN.wcsp.setBestValue( self.client_var_to_index[c], solution.facility_for_customers[c] ) facility_used = set(solution.facility_for_customers) for i in self.used_var_to_index: if i in facility_used: self.model.CFN.wcsp.setBestValue(self.used_var_to_index[i], 1) else: self.model.CFN.wcsp.setBestValue(self.used_var_to_index[i], 0)
ToulbarFacilitySolverForLns = to_lns_toulbar(ToulbarFacilitySolver)
[docs] class FacilityConstraintHandlerToulbar(ConstraintHandler): """ The allocated facility is frozen for a subpart of customers (fraction given in the constructor) """ def __init__( self, problem: FacilityProblem, fraction_of_customers: Optional[float] = 0.4 ): self.problem = problem self.fraction_of_customers = fraction_of_customers
[docs] def adding_constraint_from_results_store( self, solver: ToulbarFacilitySolverForLns, result_storage: ResultStorage, **kwargs: Any, ) -> Iterable[Any]: sol: FacilitySolution = result_storage.get_best_solution_fit()[0] customers = random.sample( range(self.problem.customer_count), k=int(self.fraction_of_customers * self.problem.customer_count), ) solver.model.CFN.timer(100) if solver.modeling == ModelingToulbarFacility.INTEGER: text = ",".join( f"{solver.client_var_to_index[c]}={sol.facility_for_customers[c]}" for c in sorted(customers) ) text = "," + text solver.model.Parse(text) if solver.modeling == ModelingToulbarFacility.BINARY: text = ",".join( [ f"{solver.key_to_index[f,c]}={1 if sol.facility_for_customers[c]==f else 0}" for c in customers for f in range(self.problem.facility_count) ] ) text = "," + text solver.model.Parse(text) solver.set_warm_start(solution=sol)
[docs] def remove_constraints_from_previous_iteration( self, solver: ToulbarFacilitySolverForLns, previous_constraints: Iterable[Any], **kwargs: Any, ) -> None: pass
[docs] class FacilityConstraintHandlerDestroyFacilityToulbar(ConstraintHandler): """ Select some factories, where all current allocated customers to those factories are totally free to be reallocated in the reduced problem. """ def __init__(self, problem: FacilityProblem): self.problem = problem
[docs] def adding_constraint_from_results_store( self, solver: ToulbarFacilitySolverForLns, result_storage: ResultStorage, **kwargs: Any, ) -> Iterable[Any]: sol: FacilitySolution = result_storage.get_best_solution_fit()[0] facilities_used = set(sol.facility_for_customers) nb_facilities = len(facilities_used) facilities_to_dst = random.sample( range(nb_facilities), k=max(1, int(0.2 * nb_facilities)) ) solver.model.CFN.timer(100) if solver.modeling == ModelingToulbarFacility.INTEGER: text = ",".join( f"{solver.client_var_to_index[c]}={sol.facility_for_customers[c]}" for c in range(self.problem.customer_count) if sol.facility_for_customers[c] not in facilities_to_dst ) text = "," + text try: solver.model.Parse(text) except: solver.model.ClearPropagationQueues() pass if solver.modeling == ModelingToulbarFacility.BINARY: text = ",".join( [ f"{solver.key_to_index[f,c]}={1 if sol.facility_for_customers[c]==f else 0}" for c in range(self.problem.customer_count) for f in range(self.problem.facility_count) if f not in facilities_to_dst ] ) text = "," + text solver.model.Parse(text) solver.set_warm_start(solution=sol)
[docs] def remove_constraints_from_previous_iteration( self, solver: ToulbarFacilitySolverForLns, previous_constraints: Iterable[Any], **kwargs: Any, ) -> None: pass