Source code for discrete_optimization.facility.problem

"""Implementation of the facility location problem with capacity constraint
(https://en.wikipedia.org/wiki/Facility_location_problem#Capacitated_facility_location)

Facility location problem consist in choosing where to locate facilities and allocate customers to those. Each customers
have a demand and facility have a capacity. The sum of demand of customers allocated to a given location can't exceed the
capacity of the facility.
"""

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

from __future__ import annotations

import math
from abc import abstractmethod
from copy import deepcopy
from dataclasses import dataclass
from typing import Any, Optional

from discrete_optimization.generic_tasks_tools.allocation import (
    AllocationProblem,
    AllocationSolution,
)
from discrete_optimization.generic_tools.do_problem import (
    ModeOptim,
    ObjectiveDoc,
    ObjectiveHandling,
    ObjectiveRegister,
    Problem,
    Solution,
    TypeObjective,
)
from discrete_optimization.generic_tools.encoding_register import (
    EncodingRegister,
    ListInteger,
)


[docs] @dataclass(frozen=True) class Point: x: float y: float
[docs] @dataclass(frozen=True) class Facility: index: int setup_cost: float capacity: float location: Point
[docs] @dataclass(frozen=True) class Customer: index: int demand: float location: Point
[docs] class FacilitySolution(AllocationSolution[Customer, Facility]): """Solution object for facility location Attributes: problem (FacilityProblem): facility problem instance facility_for_customers (list[int]): for each customers, specify the index of the facility dict_details (dict): giving more metrics of the solution such as the capacities used, the setup cost etc. See problem.evaluate(sol) implementation for FacilityProblem """ problem: FacilityProblem def __init__( self, problem: FacilityProblem, facility_for_customers: list[int], dict_details: Optional[dict[str, Any]] = None, ): super().__init__(problem=problem) self.facility_for_customers = facility_for_customers self.dict_details = dict_details
[docs] def copy(self) -> FacilitySolution: return FacilitySolution( self.problem, facility_for_customers=list(self.facility_for_customers), dict_details=deepcopy(self.dict_details), )
[docs] def lazy_copy(self) -> FacilitySolution: return FacilitySolution( self.problem, facility_for_customers=self.facility_for_customers, dict_details=self.dict_details, )
[docs] def change_problem(self, new_problem: Problem) -> None: super().change_problem(new_problem=new_problem) # invalidate evaluation results self.dict_details = None
[docs] def is_allocated(self, task: Customer, unary_resource: Facility) -> bool: return self.facility_for_customers[task.index] == unary_resource.index
[docs] class FacilityProblem(AllocationProblem[Customer, Facility]): """Base class for the facility problem. Attributes: facility_count (int): number of facilities customer_count (int): number of customers facilities (list[Facility]): list of facilities object, facility has a setup cost, capacity and location customers (list[Customer]): list of customers object, which has demand and location. """ def __init__( self, facility_count: int, customer_count: int, facilities: list[Facility], customers: list[Customer], ): self.facility_count = facility_count self.customer_count = customer_count self.facilities = facilities self.customers = customers @property def unary_resources_list(self) -> list[Facility]: return self.facilities @property def tasks_list(self) -> list[Customer]: return self.customers
[docs] @abstractmethod def evaluate_customer_facility( self, facility: Facility, customer: Customer ) -> float: """Compute the cost of allocating a customer to a facility. This function is not implemented by default. Args: facility (Facility): facility customer (Customer): customer Returns (float): a cost as a float """ ...
[docs] def evaluate(self, variable: FacilitySolution) -> dict[str, float]: # type: ignore # avoid isinstance checks for efficiency """Computes the KPI of a FacilitySolution. Args: variable (FacilitySolution): solution to evaluate Returns: dictionnary of kpi, see get_objective_register() for details of kpi. """ if variable.dict_details is not None: return variable.dict_details d = self.evaluate_cost(variable) capacity_constraint_violation = 0 for f in d["details"]: capacity_constraint_violation += max( d["details"][f]["capacity_used"] - self.facilities[f].capacity, 0 ) d["capacity_constraint_violation"] = capacity_constraint_violation return d
[docs] def evaluate_cost(self, variable: FacilitySolution) -> dict[str, Any]: """Compute the allocation cost of the solution along with setup cost too. Args: variable (FacilitySolution): facility solution to evaluate. Returns: a dictionnary containing the cost of allocation ("cost"), setup cost ("setup_cost"), and details by facilities ("details") """ facility_details = {} cost = 0.0 setup_cost = 0.0 for i in range(self.customer_count): f = variable.facility_for_customers[i] if f not in facility_details: facility_details[f] = { "capacity_used": 0.0, "customers": set(), "cost": 0.0, "setup_cost": self.facilities[f].setup_cost, } setup_cost += self.facilities[f].setup_cost facility_details[f]["capacity_used"] += self.customers[i].demand # type: ignore facility_details[f]["customers"].add(i) # type: ignore c = self.evaluate_customer_facility( facility=self.facilities[f], customer=self.customers[i] ) facility_details[f]["cost"] += c # type: ignore cost += c return {"cost": cost, "setup_cost": setup_cost, "details": facility_details}
[docs] def satisfy(self, variable: FacilitySolution) -> bool: # type: ignore # avoid isinstance checks for efficiency """Satisfaction function of a facility solution. We only check that the capacity constraint is fulfilled. We admit that the values of the vector are in the allowed range. Args: variable (FacilitySolution): facility solution to check satisfaction Returns (bool): true if solution satisfies constraint. """ d = self.evaluate(variable) return d["capacity_constraint_violation"] == 0.0
[docs] def get_attribute_register(self) -> EncodingRegister: return EncodingRegister( { "facility_for_customers": ListInteger( length=self.customer_count, arities=self.facility_count, ) } )
[docs] def get_dummy_solution(self) -> FacilitySolution: """Returns Dummy solution (that is not necessary fulfilling the constraints) All customers are allocated to the first facility (which usually will break the capacity constraint) Returns (FacilitySolution): a dummy solution """ return FacilitySolution(self, [0] * self.customer_count)
[docs] def get_solution_type(self) -> type[Solution]: return FacilitySolution
[docs] def get_objective_register(self) -> ObjectiveRegister: dict_objective = { "cost": ObjectiveDoc(type=TypeObjective.OBJECTIVE, default_weight=-1.0), "setup_cost": ObjectiveDoc( type=TypeObjective.OBJECTIVE, default_weight=-1.0 ), "capacity_constraint_violation": ObjectiveDoc( type=TypeObjective.OBJECTIVE, default_weight=-10000.0 ), } return ObjectiveRegister( objective_sense=ModeOptim.MAXIMIZATION, objective_handling=ObjectiveHandling.AGGREGATE, dict_objective_to_doc=dict_objective, )
[docs] def length(point1: Point, point2: Point) -> float: """Classic euclidian distance between two points. Args: point1 (Point): origin point point2 (Point): target point Returns (float): euclidian distance between the 2 points """ return math.sqrt((point1.x - point2.x) ** 2 + (point1.y - point2.y) ** 2)
[docs] class Facility2DProblem(FacilityProblem):
[docs] def evaluate_customer_facility( self, facility: Facility, customer: Customer ) -> float: """Implementation of a distance based cost for allocation of facility to customers. It uses the euclidian distance as cost. Args: facility (Facility): facility customer (Customer): customer Returns: a float cost """ return length(facility.location, customer.location)