"""Linear programming models and solve functions for facility location problem."""
# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
import logging
from collections.abc import Callable
from typing import Any, Optional, Union
import numpy as np
import numpy.typing as npt
from ortools.linear_solver import pywraplp
from ortools.math_opt.python import mathopt
from discrete_optimization.facility.problem import FacilityProblem, FacilitySolution
from discrete_optimization.facility.solvers import FacilitySolver
from discrete_optimization.generic_tools.do_problem import (
ParamsObjectiveFunction,
Solution,
)
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
CategoricalHyperparameter,
IntegerHyperparameter,
)
from discrete_optimization.generic_tools.lp_tools import (
ConstraintType,
GurobiMilpSolver,
MilpSolver,
OrtoolsMathOptMilpSolver,
ParametersMilp,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
)
logger = logging.getLogger(__name__)
try:
import gurobipy
except ImportError:
gurobi_available = False
else:
gurobi_available = True
from gurobipy import (
GRB,
Constr,
GenConstr,
LinExpr,
MConstr,
Model,
QConstr,
quicksum,
)
[docs]
def compute_length_matrix(
facility_problem: FacilityProblem,
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.int_], npt.NDArray[np.float64]]:
"""Precompute all the cost of allocation in a matrix form.
A matrix "closest" is also computed, sorting for each customers the facility by distance.
Args:
facility_problem (FacilityProblem): facility problem instance to compute cost matrix
Returns: setup cost vector, sorted matrix distance, matrix distance
"""
facilities = facility_problem.facilities
customers = facility_problem.customers
nb_facilities = len(facilities)
nb_customers = len(customers)
matrix_distance = np.zeros((nb_facilities, nb_customers))
costs = np.array([facilities[f].setup_cost for f in range(nb_facilities)])
for f in range(nb_facilities):
for c in range(nb_customers):
matrix_distance[f, c] = facility_problem.evaluate_customer_facility(
facilities[f], customers[c]
)
closest = np.argsort(matrix_distance, axis=0)
return costs, closest, matrix_distance
[docs]
def prune_search_space(
facility_problem: FacilityProblem, n_cheapest: int = 10, n_shortest: int = 10
) -> tuple[np.ndarray, np.ndarray]:
"""Utility function that can prune the search space.
Output of this function will be used to :
- consider only the n_cheapest facility that has the cheapest setup_cost
- consider only the n_shortest (closest actually) facilities for each customers
Args:
facility_problem (FacilityProblem): facility problem instance
n_cheapest (int): select the cheapest setup cost facilities
n_shortest (int): for each customer, select the closest facilities
Returns: tuple of matrix, first element is a matrix (facility_count, customer_count) with 2 as value
when we should consider the allocation possible. Second element in the (facility,customer) matrix distance.
"""
costs, closest, matrix_distance = compute_length_matrix(facility_problem)
sorted_costs = np.argsort(costs)
facilities = facility_problem.facilities
customers = facility_problem.customers
nb_facilities = len(facilities)
nb_customers = len(customers)
matrix_fc_indicator = np.zeros((nb_facilities, nb_customers), dtype=np.int_)
matrix_fc_indicator[sorted_costs[:n_cheapest], :] = 2
for c in range(nb_customers):
matrix_fc_indicator[closest[:n_shortest, c], c] = 2
return matrix_fc_indicator, matrix_distance
class _BaseLpFacilitySolver(MilpSolver, FacilitySolver):
"""Base class for Facility LP solvers."""
hyperparameters = [
CategoricalHyperparameter(
name="use_matrix_indicator_heuristic", default=True, choices=[False, True]
),
IntegerHyperparameter(
name="n_shortest",
default=10,
low=0,
high=100,
depends_on=("use_matrix_indicator_heuristic", [True]),
),
IntegerHyperparameter(
name="n_cheapest",
default=10,
low=0,
high=100,
depends_on=("use_matrix_indicator_heuristic", [True]),
),
]
def __init__(
self,
problem: FacilityProblem,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
**kwargs: Any,
):
super().__init__(
problem=problem, params_objective_function=params_objective_function
)
self.model = None
self.variable_decision: dict[
str, dict[Union[int, tuple[int, int]], Union[int, Any]]
] = {}
self.constraints_dict: dict[str, dict[int, Any]] = {}
self.description_variable_description = {
"x": {
"shape": (0, 0),
"type": bool,
"descr": "for each facility/customer indicate"
" if the pair is active, meaning "
"that the customer c is dealt with facility f",
}
}
self.description_constraint: dict[str, dict[str, str]] = {}
def init_model(self, **kwargs: Any) -> None:
"""
Keyword Args:
use_matrix_indicator_heuristic (bool): use the prune search method to reduce number of variable.
n_shortest (int): parameter for the prune search method
n_cheapest (int): parameter for the prune search method
Returns: None
"""
nb_facilities = self.problem.facility_count
nb_customers = self.problem.customer_count
kwargs = self.complete_with_default_hyperparameters(kwargs)
use_matrix_indicator_heuristic = kwargs["use_matrix_indicator_heuristic"]
if use_matrix_indicator_heuristic:
n_shortest = kwargs["n_shortest"]
n_cheapest = kwargs["n_cheapest"]
matrix_fc_indicator, matrix_length = prune_search_space(
self.problem, n_cheapest=n_cheapest, n_shortest=n_shortest
)
else:
matrix_fc_indicator, matrix_length = prune_search_space(
self.problem,
n_cheapest=nb_facilities,
n_shortest=nb_facilities,
)
self.model = self.create_empty_model(name="facilities")
x: dict[tuple[int, int], Union[int, Any]] = {}
for f in range(nb_facilities):
for c in range(nb_customers):
if matrix_fc_indicator[f, c] == 0:
x[f, c] = 0
elif matrix_fc_indicator[f, c] == 1:
x[f, c] = 1
elif matrix_fc_indicator[f, c] == 2:
x[f, c] = self.add_binary_variable(name="x_" + str((f, c)))
facilities = self.problem.facilities
customers = self.problem.customers
used = [self.add_binary_variable(name=f"y_{i}") for i in range(nb_facilities)]
constraints_customer: dict[int, ConstraintType] = {}
for c in range(nb_customers):
constraints_customer[c] = self.add_linear_constraint(
self.construct_linear_sum(x[f, c] for f in range(nb_facilities)) == 1
)
# one facility
constraint_capacity: dict[int, ConstraintType] = {}
for f in range(nb_facilities):
for c in range(nb_customers):
self.add_linear_constraint(used[f] >= x[f, c])
constraint_capacity[f] = self.add_linear_constraint(
self.construct_linear_sum(
x[f, c] * customers[c].demand for c in range(nb_customers)
)
<= facilities[f].capacity
)
new_obj_f = self.construct_linear_sum(
facilities[f].setup_cost * used[f] for f in range(nb_facilities)
)
new_obj_f += self.construct_linear_sum(
matrix_length[f, c] * x[f, c]
for f in range(nb_facilities)
for c in range(nb_customers)
)
self.set_model_objective(new_obj_f, minimize=True)
self.variable_decision = {"x": x, "y": used}
self.constraints_dict = {
"constraint_customer": constraints_customer,
"constraint_capacity": constraint_capacity,
}
self.description_variable_description = {
"x": {
"shape": (nb_facilities, nb_customers),
"type": bool,
"descr": "for each facility/customer indicate"
" if the pair is active, meaning "
"that the customer c is dealt with facility f",
}
}
logger.info("Initialized")
def convert_to_variable_values(
self, solution: FacilitySolution
) -> dict[Any, float]:
"""Convert a solution to a mapping between model variables and their values.
Will be used by set_warm_start().
"""
# Init all variables to 0
hinted_variables = {var: 0 for var in self.variable_decision["x"].values()}
# Set var(facility, customer) to 1 according to the solution
for c, f in enumerate(solution.facility_for_customers):
variable_decision_key = (f, c)
hinted_variables[self.variable_decision["x"][variable_decision_key]] = 1
hinted_variables[self.variable_decision["y"][f]] = 1
return hinted_variables
def retrieve_current_solution(
self,
get_var_value_for_current_solution: Callable[[Any], float],
get_obj_value_for_current_solution: Callable[[], float],
) -> FacilitySolution:
facility_for_customer = [0] * self.problem.customer_count
for (
variable_decision_key,
variable_decision_value,
) in self.variable_decision["x"].items():
if not isinstance(variable_decision_value, int):
value = get_var_value_for_current_solution(variable_decision_value)
else:
value = variable_decision_value
if value >= 0.5:
f = variable_decision_key[0]
c = variable_decision_key[1]
facility_for_customer[c] = f
return FacilitySolution(self.problem, facility_for_customer)
[docs]
class GurobiFacilitySolver(GurobiMilpSolver, _BaseLpFacilitySolver):
"""Milp solver using gurobi library
Attributes:
coloring_problem (FacilityProblem): facility problem instance to solve
params_objective_function (ParamsObjectiveFunction): objective function parameters
(however this is just used for the ResultStorage creation, not in the optimisation)
"""
[docs]
def init_model(self, **kwargs: Any) -> None:
"""
Keyword Args:
use_matrix_indicator_heuristic (bool): use the prune search method to reduce number of variable.
n_shortest (int): parameter for the prune search method
n_cheapest (int): parameter for the prune search method
Returns: None
"""
_BaseLpFacilitySolver.init_model(self, **kwargs)
self.model.setParam(GRB.Param.Threads, 4)
self.model.setParam(GRB.Param.Method, 1)
self.model.update()
[docs]
def convert_to_variable_values(
self, solution: Solution
) -> dict[gurobipy.Var, float]:
"""Convert a solution to a mapping between model variables and their values.
Will be used by set_warm_start().
"""
return _BaseLpFacilitySolver.convert_to_variable_values(self, solution)
[docs]
class MathOptFacilitySolver(OrtoolsMathOptMilpSolver, _BaseLpFacilitySolver):
"""Milp solver using gurobi library
Attributes:
coloring_problem (FacilityProblem): facility problem instance to solve
params_objective_function (ParamsObjectiveFunction): objective function parameters
(however this is just used for the ResultStorage creation, not in the optimisation)
"""
hyperparameters = _BaseLpFacilitySolver.hyperparameters
[docs]
def convert_to_variable_values(
self, solution: FacilitySolution
) -> dict[mathopt.Variable, float]:
"""Convert a solution to a mapping between model variables and their values.
Will be used by set_warm_start() to provide a suitable SolutionHint.variable_values.
See https://or-tools.github.io/docs/pdoc/ortools/math_opt/python/model_parameters.html#SolutionHint
for more information.
"""
return _BaseLpFacilitySolver.convert_to_variable_values(self, solution)
[docs]
class CbcFacilitySolver(FacilitySolver):
"""Milp formulation using cbc solver."""
hyperparameters = [
CategoricalHyperparameter(
name="use_matrix_indicator_heuristic", default=True, choices=[False, True]
),
IntegerHyperparameter(name="n_shortest", default=10, low=0, high=100),
IntegerHyperparameter(name="n_cheapest", default=10, low=0, high=100),
]
model: Optional[pywraplp.Solver]
def __init__(
self,
problem: FacilityProblem,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
**kwargs: Any,
):
super().__init__(
problem=problem, params_objective_function=params_objective_function
)
self.model = None
self.variable_decision: dict[str, dict[tuple[int, int], Union[int, Any]]] = {}
self.constraints_dict: dict[str, dict[int, Any]] = {}
self.description_variable_description = {
"x": {
"shape": (0, 0),
"type": bool,
"descr": "for each facility/customer indicate"
" if the pair is active, meaning "
"that the customer c is dealt with facility f",
}
}
self.description_constraint: dict[str, dict[str, str]] = {}
[docs]
def init_model(self, **kwargs: Any) -> None:
nb_facilities = self.problem.facility_count
nb_customers = self.problem.customer_count
kwargs = self.complete_with_default_hyperparameters(kwargs)
use_matrix_indicator_heuristic = kwargs["use_matrix_indicator_heuristic"]
if use_matrix_indicator_heuristic:
n_shortest = kwargs["n_shortest"]
n_cheapest = kwargs["n_cheapest"]
matrix_fc_indicator, matrix_length = prune_search_space(
self.problem, n_cheapest=n_cheapest, n_shortest=n_shortest
)
else:
matrix_fc_indicator, matrix_length = prune_search_space(
self.problem,
n_cheapest=nb_facilities,
n_shortest=nb_facilities,
)
s = pywraplp.Solver("facility", pywraplp.Solver.CBC_MIXED_INTEGER_PROGRAMMING)
x: dict[tuple[int, int], Union[int, Any]] = {}
for f in range(nb_facilities):
for c in range(nb_customers):
if matrix_fc_indicator[f, c] == 0:
x[f, c] = 0
elif matrix_fc_indicator[f, c] == 1:
x[f, c] = 1
elif matrix_fc_indicator[f, c] == 2:
x[f, c] = s.BoolVar(name="x_" + str((f, c)))
facilities = self.problem.facilities
customers = self.problem.customers
used = [
s.BoolVar(name="y_" + str(j)) for j in range(self.problem.facility_count)
]
constraints_customer: dict[int, Any] = {}
for c in range(nb_customers):
constraints_customer[c] = s.Add(
s.Sum([x[f, c] for f in range(nb_facilities)]) == 1
)
# one facility
constraint_capacity: dict[int, Any] = {}
for f in range(nb_facilities):
for c in range(nb_customers):
s.Add(used[f] >= x[f, c])
constraint_capacity[f] = s.Add(
s.Sum([x[f, c] * customers[c].demand for c in range(nb_customers)])
<= facilities[f].capacity
)
obj = s.Sum(
[facilities[f].setup_cost * used[f] for f in range(nb_facilities)]
+ [
matrix_length[f, c] * x[f, c]
for f in range(nb_facilities)
for c in range(nb_customers)
]
)
s.Minimize(obj)
self.model = s
self.variable_decision = {"x": x}
self.constraints_dict = {
"constraint_customer": constraints_customer,
"constraint_capacity": constraint_capacity,
}
self.description_variable_description = {
"x": {
"shape": (nb_facilities, nb_customers),
"type": bool,
"descr": "for each facility/customer indicate"
" if the pair is active, meaning "
"that the customer c is dealt with facility f",
}
}
self.description_constraint = {}
logger.info("Initialized")
[docs]
def retrieve(self) -> ResultStorage:
solution = [0] * self.problem.customer_count
for key in self.variable_decision["x"]:
variable_decision_key = self.variable_decision["x"][key]
if not isinstance(variable_decision_key, int):
value = variable_decision_key.solution_value()
else:
value = variable_decision_key
if value >= 0.5:
f = key[0]
c = key[1]
solution[c] = f
facility_solution = FacilitySolution(self.problem, solution)
fit = self.aggreg_from_sol(facility_solution)
return self.create_result_storage(
[(facility_solution, fit)],
)
[docs]
def solve(
self,
parameters_milp: Optional[ParametersMilp] = None,
time_limit: Optional[int] = 30,
**kwargs: Any,
) -> ResultStorage:
if parameters_milp is None:
parameters_milp = ParametersMilp.default()
if self.model is None:
self.init_model(**kwargs)
if self.model is None: # for mypy
raise RuntimeError(
"self.model must be not None after calling self.init_model()."
)
if time_limit is not None:
self.model.SetTimeLimit(time_limit * 1000)
res = self.model.Solve()
resdict = {
0: "OPTIMAL",
1: "FEASIBLE",
2: "INFEASIBLE",
3: "UNBOUNDED",
4: "ABNORMAL",
5: "MODEL_INVALID",
6: "NOT_SOLVED",
}
logger.info(f"Result: {resdict[res]}")
objective = self.model.Objective().Value()
logger.info(f"Objective : {objective}")
return self.retrieve()