Source code for discrete_optimization.facility.solvers.optal

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
from __future__ import annotations

from enum import Enum
from typing import Any

from discrete_optimization.facility.problem import (
    Customer,
    Facility,
    FacilityProblem,
    FacilitySolution,
)
from discrete_optimization.facility.utils import (
    compute_matrix_distance_facility_problem,
)
from discrete_optimization.generic_tasks_tools.allocation import UnaryResource
from discrete_optimization.generic_tasks_tools.base import Task
from discrete_optimization.generic_tasks_tools.enums import StartOrEnd
from discrete_optimization.generic_tasks_tools.solvers.optalcp_tasks_solver import (
    AllocationOptalSolver,
    SchedulingOptalSolver,
)
from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Solution,
)
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    EnumHyperparameter,
)

try:
    import optalcp as cp
except ImportError:
    cp = None
    optalcp_available = False
else:
    optalcp_available = True


[docs] class ModelingOptal(Enum): ALLOCATION = "Allocation" SCHEDULING = "Scheduling"
[docs] class OptalFacilitySolver( AllocationOptalSolver[Facility, Customer], SchedulingOptalSolver[Customer], ): hyperparameters = [ EnumHyperparameter( name="modeling", enum=ModelingOptal, default=ModelingOptal.ALLOCATION ) ] problem: FacilityProblem def __init__( self, problem: FacilityProblem, params_objective_function: ParamsObjectiveFunction | None = None, **kwargs, ) -> None: super().__init__(problem, params_objective_function, **kwargs) self.variables = {} self.modeling = None
[docs] def init_model(self, **kwargs: Any) -> None: kwargs = self.complete_with_default_hyperparameters(kwargs) if kwargs["modeling"] == ModelingOptal.SCHEDULING: self.init_scheduling() self.modeling = ModelingOptal.SCHEDULING elif kwargs["modeling"] == ModelingOptal.ALLOCATION: self.init_allocation() self.modeling = ModelingOptal.ALLOCATION
[docs] def init_allocation(self) -> None: self.cp_model = cp.Model() allocation = {} matrix = compute_matrix_distance_facility_problem(self.problem) cost = {} for t in self.problem.tasks_list: for c in self.problem.unary_resources_list: allocation[(t, c)] = self.cp_model.bool_var( name=f"allocation_customer_{t}_{c}" ) self.cp_model.enforce( self.cp_model.sum( [allocation[(t, c)] for c in self.problem.unary_resources_list] ) == 1 ) cost[t] = self.cp_model.sum( [ int(matrix[self.problem.customers.index(t), i]) * allocation[t, self.problem.unary_resources_list[i]] for i in range(self.problem.facility_count) ] ) for c in self.problem.unary_resources_list: self.cp_model.enforce( self.cp_model.sum( [allocation[(t, c)] * t.demand for t in self.problem.tasks_list] ) <= c.capacity ) used_facility = { i: self.cp_model.max( [ allocation[t, self.problem.unary_resources_list[i]] for t in self.problem.tasks_list ] ) for i in range(self.problem.facility_count) } cost_setup = self.cp_model.sum( [ used_facility[i] * int(self.problem.facilities[i].setup_cost) for i in range(self.problem.facility_count) ] ) cost_alloc = self.cp_model.sum([cost[t] for t in cost]) self.variables["allocation"] = allocation self.cp_model.minimize(cost_setup + cost_alloc)
[docs] def init_scheduling(self): self.cp_model = cp.Model() intervals = {} allocation = {} nb_facilities = self.problem.facility_count matrix = compute_matrix_distance_facility_problem(self.problem) cost_function = {} cost = {} for t in self.problem.tasks_list: intervals[t] = self.cp_model.interval_var( start=(0, nb_facilities - 1), end=(1, nb_facilities), length=1, name=f"interval_customer_{t}", ) cost_function[t] = self.cp_model.step_function( [ (i, int(matrix[self.problem.customers.index(t), i])) for i in range(matrix.shape[1]) ] ) cost[t] = self.cp_model.eval( cost_function[t], self.cp_model.start(intervals[t]) ) max_capa_facilities = max([f.capacity for f in self.problem.facilities]) self.cp_model.enforce( self.cp_model.sum( [self.cp_model.pulse(intervals[t], t.demand) for t in intervals] + [ self.cp_model.pulse( self.cp_model.interval_var(start=i, end=i + 1), max_capa_facilities - self.problem.facilities[i].capacity, ) for i in range(self.problem.facility_count) ] ) <= max_capa_facilities ) self.variables["intervals"] = intervals used_factory = { i: self.cp_model.max( [self.cp_model.start(intervals[t]) == i for t in intervals] ) for i in range(self.problem.facility_count) } cost_setup = self.cp_model.sum( [ used_factory[i] * int(self.problem.facilities[i].setup_cost) for i in range(self.problem.facility_count) ] ) cost_alloc = self.cp_model.sum([cost[t] for t in cost]) self.cp_model.minimize(cost_setup + cost_alloc)
[docs] def get_task_unary_resource_is_present_variable( self, task: Task, unary_resource: UnaryResource ) -> cp.BoolExpr: index = self.problem.get_index_from_unary_resource(unary_resource) return ( self.get_task_start_or_end_variable(task, start_or_end=StartOrEnd.START) == index )
[docs] def get_task_interval_variable(self, task: Task) -> cp.IntervalVar: return self.variables["intervals"][task]
[docs] def retrieve_solution(self, result: cp.SolveResult) -> Solution: if self.modeling == ModelingOptal.SCHEDULING: allocation = [ int(result.solution.get_start(self.get_task_interval_variable(t))) for t in self.problem.tasks_list ] return FacilitySolution( problem=self.problem, facility_for_customers=allocation ) elif self.modeling == ModelingOptal.ALLOCATION: allocation = [] for i in range(len(self.problem.tasks_list)): t = self.problem.tasks_list[i] for c in self.problem.unary_resources_list: if result.solution.get_value(self.variables["allocation"][(t, c)]): allocation.append(self.problem.get_index_from_unary_resource(c)) return FacilitySolution( problem=self.problem, facility_for_customers=allocation )