# Copyright (c) 2024 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import re
from enum import Enum
from typing import Any
import didppy as dp
from discrete_optimization.facility.problem import FacilityProblem, FacilitySolution
from discrete_optimization.facility.solvers import FacilitySolver
from discrete_optimization.facility.utils import (
compute_matrix_distance_facility_problem,
)
from discrete_optimization.generic_tools.do_problem import Solution
from discrete_optimization.generic_tools.do_solver import WarmstartMixin
from discrete_optimization.generic_tools.dyn_prog_tools import DpSolver
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
EnumHyperparameter,
)
logger = logging.getLogger(__name__)
[docs]
class DpFacilityModeling(Enum):
CUSTOMER = 0
FACILITY = 1
[docs]
class DpFacilitySolver(DpSolver, FacilitySolver, WarmstartMixin):
hyperparameters = DpSolver.hyperparameters + [
EnumHyperparameter(
"modeling", enum=DpFacilityModeling, default=DpFacilityModeling.CUSTOMER
)
]
def __init__(self, problem: FacilityProblem, **kwargs: Any):
super().__init__(problem, **kwargs)
self.modeling: DpFacilityModeling = None
[docs]
def init_model(
self,
modeling: DpFacilityModeling = DpFacilityModeling.CUSTOMER,
**kwargs: Any,
) -> None:
if modeling == DpFacilityModeling.FACILITY:
self.init_model_factory_view(**kwargs)
self.modeling = modeling
if modeling == DpFacilityModeling.CUSTOMER:
self.init_model_customer_view(**kwargs)
self.modeling = modeling
[docs]
def init_model_customer_view(self, **kwargs: Any) -> None:
model = dp.Model()
nb_facilities = self.problem.facility_count
nb_customers = self.problem.customer_count
capacities_facilities = [f.capacity for f in self.problem.facilities]
setup_cost = [10 * int(f.setup_cost) for f in self.problem.facilities]
matrix = compute_matrix_distance_facility_problem(self.problem)
matrix = [
[int(10 * matrix[i, j]) for j in range(matrix.shape[1])]
for i in range(matrix.shape[0])
]
distance_matrix = model.add_int_table(matrix)
capacities = model.add_int_table(capacities_facilities)
setup_cost = model.add_int_table(setup_cost)
customer = model.add_object_type(number=nb_customers)
factories = model.add_object_type(number=nb_facilities)
unallocated = model.add_set_var(
object_type=customer, target=range(nb_customers)
)
used_facilities = model.add_set_var(object_type=factories, target=[])
current_facility = model.add_element_var(object_type=factories, target=0)
# nb_facility_used = model.add_int_resource_var(target=0, less_is_better=True)
current_capacity = model.add_int_resource_var(target=0)
model.add_base_case([unallocated.is_empty()])
self.transitions = {}
for i in range(nb_customers):
transition = dp.Transition(
name=f"alloc_{i}",
cost=distance_matrix[i, current_facility] + dp.IntExpr.state_cost(),
effects=[
(unallocated, unallocated.remove(i)),
(
current_capacity,
current_capacity + self.problem.customers[i].demand,
),
],
preconditions=[
unallocated.contains(i),
~used_facilities.is_empty(),
current_capacity + self.problem.customers[i].demand
<= capacities[current_facility],
],
)
model.add_transition(transition)
self.transitions[("current", "customer", i)] = transition
for f in range(nb_facilities):
transition = dp.Transition(
name=f"alloc_via_{f}_{i}",
cost=setup_cost[f]
+ distance_matrix[i, f]
+ dp.IntExpr.state_cost(),
effects=[
(unallocated, unallocated.remove(i)),
(used_facilities, used_facilities.add(f)),
(current_facility, f),
(current_capacity, self.problem.customers[i].demand),
],
preconditions=[
unallocated.contains(i),
~(used_facilities.contains(f)),
self.problem.customers[i].demand <= capacities[f],
],
)
model.add_transition(transition)
self.transitions[(f, "customer", i)] = transition
self.model = model
[docs]
def init_model_factory_view(self, **kwargs: Any) -> None:
model = dp.Model()
nb_facilities = self.problem.facility_count
nb_customers = self.problem.customer_count
capacities_facilities = [f.capacity for f in self.problem.facilities]
setup_cost = [10 * int(f.setup_cost) for f in self.problem.facilities]
matrix = compute_matrix_distance_facility_problem(self.problem)
matrix = [
[int(10 * matrix[i, j]) for j in range(matrix.shape[1])]
for i in range(matrix.shape[0])
]
distance_matrix = model.add_int_table(matrix)
capacities = model.add_int_table(capacities_facilities)
setup_cost = model.add_int_table(setup_cost)
customer = model.add_object_type(number=nb_customers)
factories = model.add_object_type(number=nb_facilities)
used_facilities = model.add_set_var(object_type=factories, target=[])
demands = model.add_int_table([f.demand for f in self.problem.customers])
used_capacities = [
model.add_int_resource_var(target=0, less_is_better=False)
for f in range(nb_facilities)
]
current_customer = model.add_element_var(object_type=customer, target=0)
self.transitions = {}
for f in range(nb_facilities):
new_cost = (used_facilities.contains(f)).if_then_else(0, setup_cost[f])
alloc = dp.Transition(
name=f"alloc_to_{f}",
cost=new_cost
+ distance_matrix[current_customer, f]
+ dp.IntExpr.state_cost(),
effects=[
(used_facilities, used_facilities.add(f)),
(
used_capacities[f],
used_capacities[f] + demands[current_customer],
),
(current_customer, current_customer + 1),
],
preconditions=[
used_capacities[f] + demands[current_customer] <= capacities[f],
current_customer < self.problem.customer_count,
],
)
model.add_transition(alloc)
self.transitions[f] = alloc
model.add_base_case([current_customer == self.problem.customer_count])
# min_distance_to = model.add_int_table(
# [
# min(matrix[k][j] for j in range(self.problem.facility_count))
# for k in range(self.problem.customer_count)
# ]
# )
# model.add_dual_bound(sum(min_distance_to))
self.model = model
[docs]
def retrieve_solution(self, sol: dp.Solution) -> Solution:
if self.modeling == DpFacilityModeling.FACILITY:
return self.retrieve_solution_factory_view(sol)
if self.modeling == DpFacilityModeling.CUSTOMER:
return self.retrieve_solution_customer_view(sol)
[docs]
def retrieve_solution_customer_view(self, sol: dp.Solution) -> Solution:
def extract_ints(word):
return tuple(int(num) for num in re.findall(r"\d+", word))
solution = FacilitySolution(
problem=self.problem,
facility_for_customers=[None for i in range(self.problem.customer_count)],
)
current_factory = 0
for t in sol.transitions:
try:
if "alloc_via" in t.name:
factory, customer = extract_ints(t.name)
current_factory = factory
else:
customer = extract_ints(t.name)[0]
solution.facility_for_customers[customer] = current_factory
except:
pass
return solution
[docs]
def retrieve_solution_factory_view(self, sol: dp.Solution) -> Solution:
def extract_ints(word):
return tuple(int(num) for num in re.findall(r"\d+", word))
solution = FacilitySolution(
problem=self.problem,
facility_for_customers=[None for i in range(self.problem.customer_count)],
)
current_factory = 0
customer = 0
for t in sol.transitions:
current_factory = extract_ints(t.name)[0]
solution.facility_for_customers[customer] = current_factory
customer += 1
return solution
[docs]
def set_warm_start(self, solution: FacilitySolution) -> None:
if self.modeling == DpFacilityModeling.FACILITY:
self.initial_solution = [
self.transitions[f] for f in solution.facility_for_customers
]
if self.modeling == DpFacilityModeling.CUSTOMER:
initial_solution = []
customer_by_facilities = {}
for i in range(len(solution.facility_for_customers)):
f = solution.facility_for_customers[i]
if f not in customer_by_facilities:
customer_by_facilities[f] = []
customer_by_facilities[f].append(i)
for f in sorted(customer_by_facilities):
list_customer = customer_by_facilities[f]
initial_solution.append(
self.transitions[(f, "customer", list_customer[0])]
)
for customer in list_customer[1:]:
initial_solution.append(
self.transitions[("current", "customer", customer)]
)
self.initial_solution = initial_solution