Source code for discrete_optimization.generic_tools.hub_solver.optal.optalcp_tools

# Generic optal-cp python wrapper.
import asyncio
import threading
from typing import Awaitable, TypeVar

T = TypeVar("T")
import logging
from abc import abstractmethod
from typing import Any, Optional

from discrete_optimization.generic_tools.callbacks.callback import (
    Callback,
    CallbackList,
)
from discrete_optimization.generic_tools.cp_tools import (
    CpSolver,
    ParametersCp,
    SignEnum,
)
from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Problem,
    Solution,
)
from discrete_optimization.generic_tools.do_solver import (
    BoundsProviderMixin,
    StatusSolver,
)
from discrete_optimization.generic_tools.exceptions import SolveEarlyStop
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    CategoricalHyperparameter,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)

try:
    import optalcp as cp
except ImportError:
    cp = None
    optalcp_available = False
else:
    optalcp_available = True

logger = logging.getLogger(__name__)


[docs] class OptalCpSolver(CpSolver, BoundsProviderMixin): hyperparameters = [ CategoricalHyperparameter( name="searchType", choices=["Auto", "LNS", "FDS", "FDSDual", "SetTimes", "FDSLB"], default="Auto", ) ] cp_model: Optional["cp.Model"] = None early_stopping_exception: Optional[Exception] = None current_bound: int | float | None = None current_obj: int | float | None = None status_solver: StatusSolver = None use_warm_start: bool = False warm_start_solution: Optional["cp.Solution"] = None def __init__( self, problem: Problem, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs: Any, ): if not optalcp_available: raise RuntimeError("You must install optalcp to use this solver.") super().__init__(problem, params_objective_function, **kwargs)
[docs] def get_current_best_internal_objective_bound(self) -> Optional[float]: return self.current_bound
[docs] def get_current_best_internal_objective_value(self) -> Optional[float]: return self.current_obj
def _solve_sync( self, callbacks: Optional[list[Callback]] = None, time_limit: Optional[int] = 60, parameters_cp: Optional[ParametersCp] = None, **args: Any, ) -> ResultStorage: args = self.complete_with_default_hyperparameters(args) kw = {"timeLimit": time_limit, "nbWorkers": parameters_cp.nb_process} kw.update(args) result = self.cp_model.solve(parameters=cp.Parameters(**kw)) sol = self.retrieve_solution(result) fit = self.aggreg_from_sol(sol) if result.solution is not None: if result.proof: self.status_solver = StatusSolver.OPTIMAL else: self.status_solver = StatusSolver.SATISFIED else: if result.proof: self.status_solver = StatusSolver.UNSATISFIABLE else: self.status_solver = StatusSolver.UNKNOWN return self.create_result_storage([(sol, fit)]) async def _solve_async( self, callbacks: Optional[list[Callback]] = None, time_limit: Optional[int] = 60, parameters_cp: Optional[ParametersCp] = None, do_not_retrieve_solutions: bool = False, **args: Any, ): if parameters_cp is None: parameters_cp = ParametersCp.default_cpsat() args = self.complete_with_default_hyperparameters(args) self.current_bound = None callback_do = CallbackList(callbacks) solver = cp.Solver() callback_optal = OptalSolutionCallback( do_solver=self, optal_solver=solver, callback=callback_do, do_not_retrieve_solutions=do_not_retrieve_solutions, ) callback_do.on_solve_start(self) solver.on_solution = callback_optal.on_solution solver.on_objective_bound = callback_optal.on_bound kw = {"timeLimit": time_limit, "nbWorkers": parameters_cp.nb_process} kw.update(args) kw_params = { key: kw[key] for key in kw if key not in {"lower_bound_method"} and key in cp.Parameters.__annotations__.keys() } logger.debug(kw_params) if self.use_warm_start and self.warm_start_solution is not None: result = await solver.solve( model=self.cp_model, params=cp.Parameters(**kw_params), warm_start=self.warm_start_solution, ) else: result = await solver.solve( model=self.cp_model, params=cp.Parameters(**kw_params) ) if result.solution is not None: if result.proof: self.status_solver = StatusSolver.OPTIMAL else: self.status_solver = StatusSolver.SATISFIED else: if result.proof: self.status_solver = StatusSolver.UNSATISFIABLE else: self.status_solver = StatusSolver.UNKNOWN callback_do.on_solve_end(callback_optal.res, self) return callback_optal.res
[docs] def solve( self, callbacks: Optional[list[Callback]] = None, time_limit: Optional[int] = 60, parameters_cp: Optional[ParametersCp] = None, do_not_retrieve_solutions: bool = False, **args: Any, ) -> ResultStorage: """Solve the cp model Args: callbacks: time_limit: parameters_cp: do_not_retrieve_solutions: if True, do no try to retrieve solutions. Useful when using preview version of optalcp. **args: Returns: """ if self.cp_model is None: self.init_model(**args) return run_async_synchronously( self._solve_async( callbacks=callbacks, time_limit=time_limit, parameters_cp=parameters_cp, do_not_retrieve_solutions=do_not_retrieve_solutions, **args, ) )
[docs] def minimize_variable(self, var: Any) -> None: self.cp_model.minimize(var)
[docs] def add_bound_constraint(self, var: Any, sign: SignEnum, value: int) -> list[Any]: if sign == SignEnum.LEQ: expr = var <= value if sign == SignEnum.UEQ: expr = var >= value if sign == SignEnum.UP: expr = var > value if sign == SignEnum.LESS: expr = var < value if sign == SignEnum.EQUAL: expr = var == value self.cp_model.enforce(expr) return [expr]
[docs] @abstractmethod def retrieve_solution(self, result: "cp.SolveResult") -> Solution: """Return a d-o solution from the variables computed by minizinc. Args: result: output of the cp.solve Returns: """ ...
[docs] class OptalSolutionCallback: def __init__( self, do_solver: OptalCpSolver, optal_solver: "cp.Solver", callback: Callback, do_not_retrieve_solutions: bool = False, ): super().__init__() self.do_solver = do_solver self.optal_solver = optal_solver self.callback = callback self.res = do_solver.create_result_storage([]) self.nb_solutions = 0 self.current_solve_result: Optional[cp.SolveResult] = None self.do_not_retrieve_solutions = do_not_retrieve_solutions
[docs] def on_solution(self, solution: "cp.SolutionEvent") -> None: self.do_solver.current_obj = solution.solution.get_objective() if not self.do_not_retrieve_solutions: sol = self.do_solver.retrieve_solution(solution) fit = self.do_solver.aggreg_from_sol(sol) self.res.append((sol, fit)) self.nb_solutions += 1 try: stopping = self.callback.on_step_end( step=self.nb_solutions, res=self.res, solver=self.do_solver ) except Exception as e: self.do_solver.early_stopping_exception = e stopping = True logger.debug("should stop") else: if stopping: self.do_solver.early_stopping_exception = SolveEarlyStop( f"{self.do_solver.__class__.__name__}.solve() stopped by user callback." ) if stopping: logger.debug("stopping") self.optal_solver.stop("stopped by usr callback")
[docs] def on_bound(self, event: "cp.ObjectiveBoundEntry") -> None: self.do_solver.current_bound = event.value stopping = self.callback.on_step_end( step=self.nb_solutions, res=self.res, solver=self.do_solver )
[docs] def run_async_synchronously(coroutine: Awaitable[T]) -> T: """ Runs a coroutine synchronously, blocking until it completes. This helper handles two cases: 1. No event loop is running (standard script): Uses asyncio.run() 2. An event loop is running (e.g., Jupyter): Runs the coroutine in a separate thread to avoid blocking the existing loop/raising runtime errors. """ try: loop = asyncio.get_running_loop() except RuntimeError: loop = None if loop and loop.is_running(): result = None exception = None def runner(): nonlocal result, exception new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) try: result = new_loop.run_until_complete(coroutine) except Exception as e: exception = e finally: new_loop.close() t = threading.Thread(target=runner) t.start() t.join() if exception: raise exception return result else: # Case 1: No loop running. Standard run. return asyncio.run(coroutine)