Source code for discrete_optimization.facility.solvers.toulbar

#  Copyright (c) 2024 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import logging
import random
from collections.abc import Iterable
from enum import Enum
from typing import Any, Optional, Union

import tqdm

from discrete_optimization.facility.problem import FacilityProblem, FacilitySolution
from discrete_optimization.facility.solvers.facility_solver import FacilitySolver
from discrete_optimization.facility.solvers.lp import prune_search_space
from discrete_optimization.generic_tools.do_solver import WarmstartMixin
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    EnumHyperparameter,
)
from discrete_optimization.generic_tools.lns_tools import ConstraintHandler
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.generic_tools.toulbar_tools import (
    ToulbarSolver,
    to_lns_toulbar,
)

try:
    import pytoulbar2

    toulbar_available = True
except ImportError as e:
    toulbar_available = False


logger = logging.getLogger(__name__)


[docs] class ModelingToulbarFacility(Enum): INTEGER = 0 BINARY = 1
[docs] class ToulbarFacilitySolver(ToulbarSolver, FacilitySolver, WarmstartMixin): hyperparameters = ToulbarSolver.hyperparameters + [ EnumHyperparameter( name="modeling", enum=ModelingToulbarFacility, default=ModelingToulbarFacility.INTEGER, ) ] modeling: ModelingToulbarFacility key_to_index: Optional[dict] used_var_to_index: Optional[dict] client_var_to_index: Optional[dict]
[docs] def init_model(self, **kwargs): kwargs = self.complete_with_default_hyperparameters(kwargs) if kwargs["modeling"] == ModelingToulbarFacility.INTEGER: self.init_model_integer_variable(**kwargs) self.modeling = kwargs["modeling"] if kwargs["modeling"] == ModelingToulbarFacility.BINARY: self.init_model_boolean_variable(**kwargs) self.modeling = kwargs["modeling"]
[docs] def init_model_boolean_variable(self, **kwargs): """ Init a model with boolean variable array, indicating if a factory f is assigned to customer c """ nb_facilities = self.problem.facility_count nb_customers = self.problem.customer_count if "vns" in kwargs: model = pytoulbar2.CFN(kwargs.get("upper_bound", 10e8), vns=kwargs["vns"]) else: model = pytoulbar2.CFN(kwargs.get("upper_bound", 10e8)) x: dict[tuple[int, int], Union[int, Any]] = {} key_to_index = {} index = 0 matrix_fc_indicator, matrix_length = prune_search_space( self.problem, n_cheapest=nb_facilities, n_shortest=nb_facilities ) for f in range(nb_facilities): for c in range(nb_customers): x[f, c] = model.AddVariable(name=f"x_{(f, c)}", values=[0, 1]) model.AddFunction([f"x_{(f, c)}"], [0, matrix_length[f, c]]) key_to_index[(f, c)] = index index += 1 for c in range(nb_customers): model.AddSumConstraint( [key_to_index[(f, c)] for f in range(nb_facilities)], operand="==", rightcoef=1, ) used_var = {} index_var = index used_var_to_index = {} for i in range(nb_facilities): used_var[i] = model.AddVariable(f"used_{i}", [0, 1]) used_var_to_index[i] = index_var index_var += 1 model.AddFunction([f"used_{i}"], [0, self.problem.facilities[i].setup_cost]) for c in range(nb_customers): # model.CFN.wcsp.postKnapsackConstraint( # [index_var - 1, key_to_index[(i, c)]], "0 1 1 1 1 1 -1", False, True # ) model.AddLinearConstraint( [1, -1], [f"used_{i}", f"x_{i, c}"], operand=">=", rightcoef=0 ) # constraint triggering the used facility binary variable # Capacity constraint. for i in range(nb_facilities): param_c = "" for c in range(nb_customers): param_c += ( " " + str(1) + " " + str(1) + " " + str(-int(self.problem.customers[c].demand)) ) model.CFN.wcsp.postKnapsackConstraint( [key_to_index[(i, c)] for c in range(nb_customers)], str(-self.problem.facilities[i].capacity) + param_c, False, True, ) # Capacity/Knapsack constraint on facility. self.model = model self.key_to_index = key_to_index self.used_var_to_index = used_var_to_index
[docs] def init_model_integer_variable(self, **kwargs): """ Integer encoding, where main decision variable is an array of integer, giving for each customer what is the facility index. """ nb_facilities = self.problem.facility_count nb_customers = self.problem.customer_count if "vns" in kwargs: model = pytoulbar2.CFN(kwargs.get("upper_bound", 10e8), vns=kwargs["vns"]) else: model = pytoulbar2.CFN(kwargs.get("upper_bound", 10e8)) index = 0 matrix_fc_indicator, matrix_length = prune_search_space( self.problem, n_cheapest=nb_facilities, n_shortest=nb_facilities ) client_var_to_index = {} for c in tqdm.tqdm(range(nb_customers)): model.AddVariable(f"x_{c}", values=range(nb_facilities)) model.AddFunction( [f"x_{c}"], [int(matrix_length[f, c]) for f in range(nb_facilities)] ) client_var_to_index[c] = index index += 1 index_var = index used_var_to_index = {} for i in tqdm.tqdm(range(nb_facilities)): model.AddVariable(f"used_{i}", [0, 1]) used_var_to_index[i] = index_var model.AddFunction([f"used_{i}"], [0, self.problem.facilities[i].setup_cost]) for c in range(nb_customers): model.AddFunction( [f"used_{i}", f"x_{c}"], [ 10e8 if b == 0 and f == i else 0 for b in [0, 1] for f in range(nb_facilities) ], ) index_var += 1 # Force that when x_{c} == i, used_i = 1 # capacity constraint on facility. for i in tqdm.tqdm(range(nb_facilities)): # More low level version of the capacity constraint. # params_constraints = "" # for c in range(nb_customers): # params_constraints += " "+str(1)+" "+str(i)+" "+str(-int(self.problem.customers[c].demand)) # Problem.CFN.wcsp.postKnapsackConstraint([client_var_to_index[c] for c in range(nb_customers)], # str(int(-self.problem.facilities[i].capacity))+params_constraints, # False, True) model.AddGeneralizedLinearConstraint( [ (f"x_{c}", i, int(self.problem.customers[c].demand)) for c in range(nb_customers) ], operand="<=", rightcoef=int(self.problem.facilities[i].capacity), ) self.model = model self.used_var_to_index = used_var_to_index self.client_var_to_index = client_var_to_index
[docs] def retrieve_solution(self, solution) -> FacilitySolution: if self.modeling == ModelingToulbarFacility.BINARY: return self.retrieve_solution_binary(solution) if self.modeling == ModelingToulbarFacility.INTEGER: return self.retrieve_solution_integer(solution)
[docs] def retrieve_solution_binary(self, solution) -> FacilitySolution: sol = FacilitySolution( problem=self.problem, facility_for_customers=[None] * self.problem.customer_count, ) for x in self.key_to_index: index = self.key_to_index[x] if solution[0][index] == 1: sol.facility_for_customers[x[1]] = x[0] return sol
[docs] def retrieve_solution_integer(self, solution) -> FacilitySolution: return FacilitySolution( problem=self.problem, facility_for_customers=solution[0][: self.problem.customer_count], )
[docs] def set_warm_start(self, solution: FacilitySolution) -> None: if self.modeling == ModelingToulbarFacility.INTEGER: self.set_warm_start_integer(solution) if self.modeling == ModelingToulbarFacility.BINARY: self.set_warm_start_binary(solution)
[docs] def set_warm_start_binary(self, solution: FacilitySolution) -> None: for f, c in self.key_to_index: if solution.facility_for_customers[c] == f: self.model.CFN.wcsp.setBestValue(self.key_to_index[f, c], 1) else: self.model.CFN.wcsp.setBestValue(self.key_to_index[f, c], 0) facility_used = set(solution.facility_for_customers) for i in self.used_var_to_index: if i in facility_used: self.model.CFN.wcsp.setBestValue(self.used_var_to_index[i], 1) else: self.model.CFN.wcsp.setBestValue(self.used_var_to_index[i], 0)
[docs] def set_warm_start_integer(self, solution: FacilitySolution) -> None: for c in self.client_var_to_index: self.model.CFN.wcsp.setBestValue( self.client_var_to_index[c], solution.facility_for_customers[c] ) facility_used = set(solution.facility_for_customers) for i in self.used_var_to_index: if i in facility_used: self.model.CFN.wcsp.setBestValue(self.used_var_to_index[i], 1) else: self.model.CFN.wcsp.setBestValue(self.used_var_to_index[i], 0)
ToulbarFacilitySolverForLns = to_lns_toulbar(ToulbarFacilitySolver)
[docs] class FacilityConstraintHandlerToulbar(ConstraintHandler): """ The allocated facility is frozen for a subpart of customers (fraction given in the constructor) """ def __init__( self, problem: FacilityProblem, fraction_of_customers: Optional[float] = 0.4 ): self.problem = problem self.fraction_of_customers = fraction_of_customers
[docs] def adding_constraint_from_results_store( self, solver: ToulbarFacilitySolverForLns, result_storage: ResultStorage, result_storage_last_iteration: ResultStorage, **kwargs: Any, ) -> Iterable[Any]: """Add constraints to the internal model of a solver based on previous solutions Args: solver: solver whose internal model is updated result_storage: all results so far result_storage_last_iteration: results from last LNS iteration only **kwargs: Returns: list of added constraints """ sol: FacilitySolution = self.extract_best_solution_from_last_iteration( result_storage=result_storage, result_storage_last_iteration=result_storage_last_iteration, ) customers = random.sample( range(self.problem.customer_count), k=int(self.fraction_of_customers * self.problem.customer_count), ) solver.model.CFN.timer(100) if solver.modeling == ModelingToulbarFacility.INTEGER: text = ",".join( f"{solver.client_var_to_index[c]}={sol.facility_for_customers[c]}" for c in sorted(customers) ) text = "," + text solver.model.Parse(text) if solver.modeling == ModelingToulbarFacility.BINARY: text = ",".join( [ f"{solver.key_to_index[f, c]}={1 if sol.facility_for_customers[c] == f else 0}" for c in customers for f in range(self.problem.facility_count) ] ) text = "," + text solver.model.Parse(text) solver.set_warm_start(solution=sol) return []
[docs] class FacilityConstraintHandlerDestroyFacilityToulbar(ConstraintHandler): """ Select some factories, where all current allocated customers to those factories are totally free to be reallocated in the reduced problem. """ def __init__(self, problem: FacilityProblem): self.problem = problem
[docs] def adding_constraint_from_results_store( self, solver: ToulbarFacilitySolverForLns, result_storage: ResultStorage, result_storage_last_iteration: ResultStorage, **kwargs: Any, ) -> Iterable[Any]: """Add constraints to the internal model of a solver based on previous solutions Args: solver: solver whose internal model is updated result_storage: all results so far result_storage_last_iteration: results from last LNS iteration only **kwargs: Returns: list of added constraints """ sol: FacilitySolution = self.extract_best_solution_from_last_iteration( result_storage=result_storage, result_storage_last_iteration=result_storage_last_iteration, ) facilities_used = set(sol.facility_for_customers) nb_facilities = len(facilities_used) facilities_to_dst = random.sample( range(nb_facilities), k=max(1, int(0.2 * nb_facilities)) ) solver.model.CFN.timer(100) if solver.modeling == ModelingToulbarFacility.INTEGER: text = ",".join( f"{solver.client_var_to_index[c]}={sol.facility_for_customers[c]}" for c in range(self.problem.customer_count) if sol.facility_for_customers[c] not in facilities_to_dst ) text = "," + text try: solver.model.Parse(text) except: solver.model.ClearPropagationQueues() pass if solver.modeling == ModelingToulbarFacility.BINARY: text = ",".join( [ f"{solver.key_to_index[f, c]}={1 if sol.facility_for_customers[c] == f else 0}" for c in range(self.problem.customer_count) for f in range(self.problem.facility_count) if f not in facilities_to_dst ] ) text = "," + text solver.model.Parse(text) solver.set_warm_start(solution=sol) return []