Source code for discrete_optimization.generic_tools.asp_tools

#  Copyright (c) 2024 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import logging
import os
from abc import abstractmethod
from typing import Any, Optional

import clingo

from discrete_optimization.generic_tools.callbacks.callback import (
    Callback,
    CallbackList,
)
from discrete_optimization.generic_tools.do_problem import Solution
from discrete_optimization.generic_tools.do_solver import SolverDO
from discrete_optimization.generic_tools.exceptions import SolveEarlyStop
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)

logger = logging.getLogger(__name__)
cur_folder = os.path.abspath(os.path.dirname(__file__))


[docs] class AspClingoSolver(SolverDO): """Base class for solver based on Answer Set Programming formulation and clingo solver.""" ctl: Optional[clingo.Control] = None early_stopping_exception: Optional[Exception] = None
[docs] @abstractmethod def retrieve_solution(self, model: clingo.Model) -> Solution: """Construct a do solution from a clingo model. Args: model: the current constructed clingo model Returns: the intermediate solution, at do format. """ ...
[docs] def solve( self, callbacks: Optional[list[Callback]] = None, time_limit: Optional[float] = 100.0, **kwargs: Any, ) -> ResultStorage: """Solve the problem with a CpSat solver drom ortools library. Args: callbacks: list of callbacks used to hook into the various stage of the solve time_limit: the solve process stops after this time limit (in seconds). If None, no time limit is applied. **kwargs: keyword arguments passed to `self.init_model()` Returns: A dedicated clingo callback is used to: - update a resultstorage each time a new solution is found by the clingo solver. - call the user (do) callbacks at each new solution, with the possibility of early stopping if the callback return True. This clingo callback use the method `self.retrieve_solution()` to reconstruct a do Solution from the current clingo model. """ self.early_stopping_exception = None callbacks_list = CallbackList(callbacks=callbacks) callbacks_list.on_solve_start(solver=self) if self.ctl is None: self.init_model(**kwargs) self.ctl.ground([("base", [])]) asp_callback = AspCallback( do_solver=self, callback=callbacks_list, dump_model_in_folders=kwargs.get("dump_model_in_folders", False), ) with self.ctl.solve(on_model=asp_callback.on_model, async_=True) as handle: handle.wait(time_limit) handle.cancel() if self.early_stopping_exception: if isinstance(self.early_stopping_exception, SolveEarlyStop): logger.info(self.early_stopping_exception) else: raise self.early_stopping_exception res = asp_callback.res callbacks_list.on_solve_end(res=res, solver=self) return res
[docs] class AspCallback: def __init__( self, do_solver: AspClingoSolver, callback: Callback, dump_model_in_folders: bool = False, ): super().__init__() self.dump_model_in_folders = dump_model_in_folders self.do_solver = do_solver self.callback = callback self.res = do_solver.create_result_storage() self.nb_solutions = 0
[docs] def on_model(self, model: clingo.Model) -> bool: # debug: store model if self.dump_model_in_folders: folder = os.path.join( cur_folder, f"output-folder/model_{self.nb_solutions}" ) if os.path.exists(folder): os.removedirs(folder) os.makedirs(folder) with open(os.path.join(folder, "model.txt"), "w") as model_file: model_file.write(str(model)) # translate into do solution sol = self.do_solver.retrieve_solution(model=model) fit = self.do_solver.aggreg_from_sol(sol) self.res.append((sol, fit)) self.nb_solutions += 1 # end of step callback: stopping? try: stopping = self.callback.on_step_end( step=self.nb_solutions, res=self.res, solver=self.do_solver ) except Exception as e: self.do_solver.early_stopping_exception = e stopping = True else: if stopping: self.do_solver.early_stopping_exception = SolveEarlyStop( f"{self.do_solver.__class__.__name__}.solve() stopped by user callback." ) # optimality? stopping = stopping or model.optimality_proven # return go-on status (=not stopping) return not stopping