Source code for discrete_optimization.generic_tools.pareto_tools

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import logging
import time
from typing import Any, Callable, List

from ortools.sat.python import cp_model

from discrete_optimization.generic_tools.callbacks.callback import Callback
from discrete_optimization.generic_tools.cp_tools import SignEnum
from discrete_optimization.generic_tools.do_problem import Problem, Solution
from discrete_optimization.generic_tools.do_solver import SolverDO, StatusSolver
from discrete_optimization.generic_tools.ortools_cpsat_tools import OrtoolsCpSatSolver

logger = logging.getLogger(__name__)


[docs] class CpsatParetoSolver(SolverDO): """ Finds the Pareto front for N objectives using Iterative Blocking with Lexicographic Tightening. Algorithm: 1. Search for a feasible solution (optimizing primary objective). 2. 'Tighten' the solution to ensure Pareto optimality (Lexicographic minimization chain). 3. Record the solution. 4. Add a 'Blocking Clause' to the model: Next solution must be strictly better in at least one objective. 5. Repeat until Infeasible. """ def __init__( self, problem: Problem, solver: OrtoolsCpSatSolver, objective_names: List[str], dict_function: dict[str, Callable[[Solution], float]] = None, delta_abs_improvement: list[int] = None, delta_ref_improvement: list[float] = None, ): """ Args: solver: The base CP-SAT solver instance. objective_names: list of objectives as string, to consider in the pareto dict_function: a map of function to compute objective kpi on a solution delta_abs_improvement: list of absolute improvement for each objective to add in the blocking clauses delta_ref_improvement: list of relative improvement for each objective to add in the blocking clauses """ super().__init__(self, problem) self.problem = problem self.solver = solver self.objective_names = objective_names if dict_function is None: dict_function = { key: lambda sol: self.problem.evaluate(sol)[key] for key in self.objective_names } self.dict_function = dict_function if delta_abs_improvement is None: delta_abs_improvement = [1 for _ in range(len(objective_names))] if delta_ref_improvement is None: delta_ref_improvement = [0.01 for _ in range(len(objective_names))] self.delta_abs_improvement = delta_abs_improvement self.delta_ref_improvement = delta_ref_improvement
[docs] def solve( self, obj_vars: list[cp_model.LinearExpr], callbacks: list[Callback] = None, time_limit: int = None, subsolver_kwargs: dict = None, **kwargs, ): if subsolver_kwargs is None: subsolver_kwargs = {} # 1. Initialize Model # self.solver.init_model() pareto_front = [] objectives = obj_vars t = time.perf_counter() while True and time.perf_counter() - t < time_limit: logger.info(f"--- Iteration: Searching for non-dominated solutions ---") # --- Step 1: Find a point on the pareto frontier with first objective --- # We minimize the first objective to find *some* point on the frontier. # (Constraints from previous steps ensure we don't find known points). self.solver.set_lexico_objective(self.objective_names[0]) try: self.solver.set_warm_start_from_previous_run() except RuntimeError: pass res = self.solver.solve(**subsolver_kwargs) status = self.solver.status_solver if status in {StatusSolver.UNSATISFIABLE, StatusSolver.UNKNOWN}: logger.info(" -> Infeasible. Pareto Front Search Complete.") break # Get candidate solution sol_candidate = res.get_best_solution() print(status, sol_candidate) vals_candidate = [ self.dict_function[obj](sol_candidate) for obj in self.objective_names ] logger.info(f" -> Candidate found (pre-tightening): {vals_candidate}") # --- Step 2: Lexicographic Tightening --- # Ensure the point is strictly Pareto optimal. # Chain: Fix f1, Min f2 -> Fix f1,f2, Min f3 -> ... tightening_constraints = [] current_vals = list(vals_candidate) # We update these as we tighten sol_lex = None fit_lex = None for i in range(len(objectives)): # For the objective we are about to optimize (obj[i]), we don't fix it yet. # For all PREVIOUS objectives (0 to i-1), we fix them to their best found values. if i > 0: prev_var = objectives[i - 1] prev_val = int(current_vals[i - 1]) # Add constraint: obj[i-1] <= val c = self.solver.add_bound_constraint( prev_var, SignEnum.LEQ, prev_val ) tightening_constraints.extend(c) # Now minimize current objective i self.solver.minimize_variable(objectives[i]) # Solve self.solver.set_warm_start_from_previous_run() res_lex = self.solver.solve(**subsolver_kwargs) status = self.solver.status_solver if status in {StatusSolver.UNSATISFIABLE, StatusSolver.UNKNOWN}: logger.warning( " -> Tightening failed (Infeasible). Keeping previous candidate." ) break # Update current best values based on this tightening step # Again, re-evaluate via problem sol_lex, fit_lex = res_lex.get_best_solution_fit() current_vals = [ self.dict_function[obj](sol_lex) for obj in self.objective_names ] # Capture Final Tightened Solution # Usually the last solution in the chain is the Tightened one logger.info(f" -> Tightened Result: {current_vals}") pareto_front.append((sol_lex, fit_lex)) if tightening_constraints: self.solver.remove_constraints(tightening_constraints) # Block regions dominated by or equal to the found values self._add_blocking_constraint(objectives, current_vals) return pareto_front
def _add_blocking_constraint(self, objectives: List[Any], values: List[float]): """ Adds a permanent constraint: OR(obj[i] <= val[i] - 1) for all i. This blocks any solution that is 'worse or equal' in all dimensions. """ literals = [] for i, obj_var in enumerate(objectives): val = int(values[i]) # New boolean for this implication lit = self.solver.cp_model.NewBoolVar(f"block_{len(literals)}_{val}") # Enforce: lit => obj_var <= val - 1 # (Strict improvement in this objective) self.solver.cp_model.Add( obj_var <= val - max( self.delta_abs_improvement[i], int(self.delta_ref_improvement[i] * val), ) ).OnlyEnforceIf(lit) literals.append(lit) # Enforce: At least one of these strict improvements must happen self.solver.cp_model.AddBoolOr(literals)