# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import contextlib
import logging
import random
from abc import abstractmethod
from collections.abc import Iterable
from typing import Any, Optional, TypedDict
import numpy as np
from discrete_optimization.generic_tools.callbacks.callback import (
Callback,
CallbackList,
)
from discrete_optimization.generic_tools.do_problem import (
ModeOptim,
ParamsObjectiveFunction,
Problem,
Solution,
)
from discrete_optimization.generic_tools.do_solver import SolverDO, WarmstartMixin
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
CategoricalHyperparameter,
SubBrick,
SubBrickClsHyperparameter,
SubBrickHyperparameter,
SubBrickKwargsHyperparameter,
)
from discrete_optimization.generic_tools.hyperparameters.hyperparametrizable import (
Hyperparametrizable,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
fitness_class,
from_solutions_to_result_storage,
)
logger = logging.getLogger(__name__)
[docs]
class ConstraintHandler(Hyperparametrizable):
[docs]
@abstractmethod
def adding_constraint_from_results_store(
self, solver: SolverDO, result_storage: ResultStorage, **kwargs: Any
) -> Iterable[Any]:
...
[docs]
@abstractmethod
def remove_constraints_from_previous_iteration(
self,
solver: SolverDO,
previous_constraints: Iterable[Any],
**kwargs: Any,
) -> None:
...
[docs]
class InitialSolution(Hyperparametrizable):
[docs]
@abstractmethod
def get_starting_solution(self) -> ResultStorage:
...
[docs]
class InitialSolutionFromSolver(InitialSolution):
def __init__(self, solver: SolverDO, **kwargs: Any):
self.solver = solver
self.dict = kwargs
[docs]
@abstractmethod
def get_starting_solution(self) -> ResultStorage:
return self.solver.solve(**self.dict)
[docs]
class TrivialInitialSolution(InitialSolution):
def __init__(self, solution: ResultStorage, **kwargs: Any):
self.solution = solution
self.dict = kwargs
[docs]
@abstractmethod
def get_starting_solution(self) -> ResultStorage:
return self.solution
[docs]
class PostProcessSolution(Hyperparametrizable):
# From solution from MIP or CP you can build other solution.
# Here you can have many different approaches:
# if solution from mip/cp are not feasible you can code a repair function
# you can also do mall changes (filling gap in a schedule) to try to improve the solution
# you can also run algorithms from the new found solution.
[docs]
@abstractmethod
def build_other_solution(self, result_storage: ResultStorage) -> ResultStorage:
...
[docs]
class TrivialPostProcessSolution(PostProcessSolution):
def __init__(self, **kwargs):
...
[docs]
def build_other_solution(self, result_storage: ResultStorage) -> ResultStorage:
return result_storage
[docs]
class BaseLns(SolverDO, WarmstartMixin):
"""Base class for Large Neighborhood Search solvers."""
subsolver: SolverDO
"""Sub-solver used by this lns solver at each iteration."""
constraint_handler: ConstraintHandler
initial_solution_provider: Optional[InitialSolution]
post_process_solution: Optional[PostProcessSolution]
hyperparameters = [
SubBrickHyperparameter(
name="subsolver", name_in_kwargs="subsolver_subbrick", choices=[]
),
SubBrickHyperparameter(
name="initial_solution_provider",
name_in_kwargs="initial_solution_provider_subbrick",
choices=[],
depends_on=("skip_initial_solution_provider", [False]),
),
SubBrickHyperparameter(
name="constraint_handler",
name_in_kwargs="constraint_handler_subbrick",
choices=[],
),
SubBrickHyperparameter(
name="post_process_solution",
name_in_kwargs="post_process_solution_subbrick",
choices=[],
),
CategoricalHyperparameter(
name="skip_initial_solution_provider", choices=[True, False], default=False
),
]
def __init__(
self,
problem: Problem,
subsolver: Optional[SolverDO] = None,
initial_solution_provider: Optional[InitialSolution] = None,
constraint_handler: Optional[ConstraintHandler] = None,
post_process_solution: Optional[PostProcessSolution] = None,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
**kwargs: Any,
):
super().__init__(
problem=problem, params_objective_function=params_objective_function
)
kwargs = self.complete_with_default_hyperparameters(kwargs)
if subsolver is None:
if kwargs["subsolver_subbrick"] is None:
if "build_default_subsolver" in kwargs:
subsolver = kwargs["build_default_subsolver"](
self.problem, **kwargs
)
else:
raise ValueError(
"`subsolver_subbrick` cannot be None if "
"neither `subsolver` nor `build_default_subsolver` are specified."
)
else:
subsolver_subbrick: SubBrick = kwargs["subsolver_subbrick"]
subsolver_cls = subsolver_subbrick.cls
subsolver_kwargs = subsolver_subbrick.kwargs
if not issubclass(subsolver_cls, SolverDO):
raise ValueError(
"subsolver_subbrick.cls must a subclass of SolverDO"
)
subsolver = subsolver_cls(problem=self.problem, **subsolver_kwargs)
subsolver.init_model(**subsolver_kwargs)
self.subsolver = subsolver
if constraint_handler is None:
if kwargs["constraint_handler_subbrick"] is None:
if "build_default_contraint_handler" in kwargs:
constraint_handler = kwargs["build_default_contraint_handler"](
self.problem, **kwargs
)
else:
raise ValueError(
"`constraint_handler_cls` cannot be None if "
"neither `constraint_handler` nor `build_default_contraint_handler` are specified."
)
else:
constraint_handler_subbrick: SubBrick = kwargs[
"constraint_handler_subbrick"
]
constraint_handler_cls = constraint_handler_subbrick.cls
constraint_handler_kwargs = constraint_handler_subbrick.kwargs
if not issubclass(constraint_handler_cls, ConstraintHandler):
raise ValueError(
"constraint_handler_subbrick.cls must a subclass of ConstraintHandler"
)
constraint_handler = constraint_handler_cls(
problem=self.problem, **constraint_handler_kwargs
)
self.constraint_handler = constraint_handler
if post_process_solution is None:
if kwargs["post_process_solution_subbrick"] is None:
if "build_default_post_process_solution" in kwargs:
post_process_solution = kwargs[
"build_default_post_process_solution"
](
self.problem,
self.params_objective_function,
**kwargs,
)
else:
post_process_solution = None # will be interpreted as a TrivialPostProcessSolution in solve()
else:
post_process_solution_subbrick: SubBrick = kwargs[
"post_process_solution_subbrick"
]
post_process_solution_cls = post_process_solution_subbrick.cls
post_process_solution_kwargs = post_process_solution_subbrick.kwargs
if not issubclass(post_process_solution_cls, PostProcessSolution):
raise ValueError(
"post_process_solution_subbrick.cls must a subclass of PostProcessSolution"
)
post_process_solution = post_process_solution_cls(
problem=self.problem,
params_objective_function=self.params_objective_function,
**post_process_solution_kwargs,
)
self.post_process_solution = post_process_solution
if initial_solution_provider is None:
# initial_solution_provider_subbrick: Optional[SubBrick] = kwargs.get("initial_solution_provider_subbrick", None)
if kwargs["initial_solution_provider_subbrick"] is None:
if "build_default_initial_solution_provider" in kwargs:
initial_solution_provider = kwargs[
"build_default_initial_solution_provider"
](
self.problem,
self.params_objective_function,
**kwargs,
)
else:
initial_solution_provider = (
None # ok if solve_lns with skip_first_iteration
)
else:
initial_solution_provider_subbrick: SubBrick = kwargs[
"initial_solution_provider_subbrick"
]
initial_solution_provider_cls = initial_solution_provider_subbrick.cls
initial_solution_provider_kwargs = (
initial_solution_provider_subbrick.kwargs
)
initial_solution_provider = initial_solution_provider_cls(
problem=self.problem,
params_objective_function=self.params_objective_function,
**initial_solution_provider_kwargs,
)
self.initial_solution_provider = initial_solution_provider
[docs]
def set_warm_start(self, solution: Solution) -> None:
"""Make the solver warm start from the given solution.
Be careful, if you set in `skip_initial_solution_provider=True` in `self.solve()`,
the initial solution will be ignored.
"""
result_storage = from_solutions_to_result_storage(
list_solution=[solution],
problem=self.problem,
params_objective_function=self.params_objective_function,
)
self.initial_solution_provider = TrivialInitialSolution(solution=result_storage)
[docs]
def create_submodel(self) -> contextlib.AbstractContextManager:
return _dummy_contextmanager()
[docs]
def solve(
self,
nb_iteration_lns: int,
time_limit_subsolver: Optional[float] = 100.0,
time_limit_subsolver_iter0: Optional[float] = None,
nb_iteration_no_improvement: Optional[int] = None,
skip_initial_solution_provider: bool = False,
stop_first_iteration_if_optimal: bool = True,
callbacks: Optional[list[Callback]] = None,
**kwargs: Any,
) -> ResultStorage:
"""Solve the problem with an LNS loop
Args:
nb_iteration_lns: number of lns iteration
time_limit_subsolver: time limit (in seconds) for a subsolver `solve()` call
If None, no time limit is applied.
time_limit_subsolver_iter0: time limit (in seconds) for the first subsolver `solve()` call,
in the case we are skipping the initial solution provider (`skip_initial_solution_provider is True`)
If None, we use the regular `time_limit` parameter even for this first solve.
nb_iteration_no_improvement: maximal number of consecutive iteration without improvement allowed
before stopping the solve process.
skip_initial_solution_provider: if True, we do not use `self.initial_solution_provider`
but instead launch a first `self.subsolver.solve()`
stop_first_iteration_if_optimal: if True, if `skip_initial_solution_provider, and if subsolver tells
its result is optimal after the first `self.subsolver.solve()` (so before any constraint tempering),
we stop the solve process.
callbacks: list of callbacks used to hook into the various stage of the solve
**kwargs: passed to the subsolver
Returns:
"""
# wrap all callbacks in a single one
callbacks_list = CallbackList(callbacks=callbacks)
# start of solve callback
callbacks_list.on_solve_start(solver=self)
# manage None post_process_solution (can happen in subclasses __init__)
if self.post_process_solution is None:
self.post_process_solution = TrivialPostProcessSolution()
sense = self.params_objective_function.sense_function
if nb_iteration_no_improvement is None:
nb_iteration_no_improvement = 2 * nb_iteration_lns
current_nb_iteration_no_improvement = 0
self.init_model(**kwargs)
if skip_initial_solution_provider:
best_objective = (
float("inf") if sense == ModeOptim.MINIMIZATION else -float("inf")
)
store_lns = None
stopping = False
else:
if self.initial_solution_provider is None:
raise ValueError(
"`initial_solution_provider` cannot be None "
"if `skip_initial_solution_provider` is False."
)
store_lns = self.initial_solution_provider.get_starting_solution()
store_lns = self.post_process_solution.build_other_solution(store_lns)
init_solution, objective = store_lns.get_best_solution_fit()
if init_solution is None:
raise RuntimeError(
"`initial_solution_provider` + `post_process_solution` gave no solution."
)
else:
satisfy = self.problem.satisfy(init_solution)
logger.debug(f"Satisfy Initial solution {satisfy}")
try:
logger.debug(
f"Nb task preempted = {init_solution.get_nb_task_preemption()}" # type: ignore
)
logger.debug(f"Nb max preemption = {init_solution.get_max_preempted()}") # type: ignore
except:
pass
best_objective = objective
# end of step callback: stopping?
stopping = callbacks_list.on_step_end(step=0, res=store_lns, solver=self)
result_store: ResultStorage
lsn_contraints: Optional[Iterable[Any]] = None
if not stopping:
# time_limit subsolver warning (only once)
if "time_limit" in kwargs:
logger.warning(
"`time_limit` arg will be overriden by "
"`time_limit_subsolver` and `time_limit_subsolver_iter0`."
)
kwargs_subsolver = dict(kwargs)
for iteration in range(nb_iteration_lns):
logger.info(
f"Starting iteration n° {iteration} current objective {best_objective}"
)
with self.create_submodel() as child:
if (
iteration == 0
and not skip_initial_solution_provider
or iteration >= 1
):
lsn_contraints = self.constraint_handler.adding_constraint_from_results_store(
solver=self.subsolver,
child_instance=child,
result_storage=store_lns,
last_result_store=store_lns
if iteration == 0
else result_store,
)
try:
if (
skip_initial_solution_provider
and iteration == 0
and time_limit_subsolver_iter0 is not None
):
kwargs_subsolver["time_limit"] = time_limit_subsolver_iter0
else:
kwargs_subsolver["time_limit"] = time_limit_subsolver
result_store = self.subsolver.solve(
instance=child, **kwargs_subsolver
)
logger.info(f"iteration n° {iteration} Solved !!!")
if hasattr(self.subsolver, "status_solver"):
logger.info(self.subsolver.status_solver)
if len(result_store) > 0:
logger.debug("Solved !!!")
bsol, fit = result_store.get_best_solution_fit()
logger.debug(f"Fitness Before = {fit}")
if bsol is not None:
logger.debug(
f"Satisfaction Before = {self.problem.satisfy(bsol)}"
)
else:
logger.debug(f"Satisfaction Before = {False}")
logger.debug("Post Process..")
result_store = (
self.post_process_solution.build_other_solution(
result_store
)
)
bsol, fit = result_store.get_best_solution_fit()
if bsol is not None:
logger.debug(
f"Satisfaction After = {self.problem.satisfy(bsol)}"
)
else:
logger.debug(f"Satisfaction After = {False}")
if (
sense == ModeOptim.MAXIMIZATION
and fit >= best_objective
):
if fit > best_objective:
current_nb_iteration_no_improvement = 0
else:
current_nb_iteration_no_improvement += 1
best_objective = fit
elif sense == ModeOptim.MAXIMIZATION:
current_nb_iteration_no_improvement += 1
elif (
sense == ModeOptim.MINIMIZATION
and fit <= best_objective
):
if fit < best_objective:
current_nb_iteration_no_improvement = 0
else:
current_nb_iteration_no_improvement += 1
best_objective = fit
elif sense == ModeOptim.MINIMIZATION:
current_nb_iteration_no_improvement += 1
if skip_initial_solution_provider and iteration == 0:
store_lns = result_store
else:
for s, f in list(result_store):
store_lns.append((s, f))
else:
current_nb_iteration_no_improvement += 1
if (
skip_initial_solution_provider
and self.subsolver.is_optimal()
and iteration == 0
and self.problem.satisfy(bsol)
and stop_first_iteration_if_optimal
):
logger.info("Finish LNS because found optimal solution")
break
except Exception as e:
current_nb_iteration_no_improvement += 1
logger.warning(f"Failed ! reason : {e}")
logger.debug(
f"{current_nb_iteration_no_improvement} / {nb_iteration_no_improvement}"
)
if (
current_nb_iteration_no_improvement
> nb_iteration_no_improvement
):
logger.info("Finish LNS with maximum no improvement iteration ")
break
if lsn_contraints is not None:
self.constraint_handler.remove_constraints_from_previous_iteration(
solver=self.subsolver, previous_constraints=lsn_contraints
)
# end of step callback: stopping?
if skip_initial_solution_provider:
step = iteration
else:
step = iteration + 1
stopping = callbacks_list.on_step_end(
step=step, res=store_lns, solver=self
)
if stopping:
break
# end of solve callback
callbacks_list.on_solve_end(res=store_lns, solver=self)
return store_lns
@contextlib.contextmanager
def _dummy_contextmanager():
yield None
[docs]
class ConstraintStatus(TypedDict):
nb_usage: int
nb_improvement: int
name: str
[docs]
class ConstraintHandlerMix(ConstraintHandler):
def __init__(
self,
problem: Problem,
list_constraints_handler: list[ConstraintHandler],
list_proba: list[float],
update_proba: bool = True,
tag_constraint_handler: Optional[list[str]] = None,
sequential: bool = False,
):
self.problem = problem
self.list_constraints_handler = list_constraints_handler
self.sequential = sequential
if tag_constraint_handler is None:
self.tag_constraint_handler = [
str(i) for i in range(len(self.list_constraints_handler))
]
else:
self.tag_constraint_handler = tag_constraint_handler
self.list_proba = np.array(list_proba)
self.list_proba = self.list_proba / np.sum(self.list_proba)
self.index_np = np.array(range(len(self.list_proba)), dtype=np.int_)
self.current_iteration = 0
self.status: dict[int, ConstraintStatus] = {
i: {
"nb_usage": 0,
"nb_improvement": 0,
"name": self.tag_constraint_handler[i],
}
for i in range(len(self.list_constraints_handler))
}
self.last_index_param: Optional[int] = None
self.last_fitness: Optional[fitness_class] = None
self.update_proba = update_proba
[docs]
def adding_constraint_from_results_store(
self,
solver: SolverDO,
result_storage: ResultStorage,
**kwargs: Any,
) -> Iterable[Any]:
new_fitness = result_storage.get_best_solution_fit()[1]
if self.last_index_param is not None:
if new_fitness != self.last_fitness:
self.status[self.last_index_param]["nb_improvement"] += 1
self.last_fitness = new_fitness
if self.update_proba:
self.list_proba[self.last_index_param] *= 1.05
self.list_proba = self.list_proba / np.sum(self.list_proba)
else:
if self.update_proba:
self.list_proba[self.last_index_param] *= 0.95
self.list_proba = self.list_proba / np.sum(self.list_proba)
else:
self.last_fitness = new_fitness
if self.sequential:
if self.last_index_param is not None:
choice = (self.last_index_param + 1) % len(
self.list_constraints_handler
)
else:
choice = 0
else:
if random.random() <= 0.95:
choice = np.random.choice(self.index_np, size=1, p=self.list_proba)[0]
else:
max_improvement = max(
[
self.status[x]["nb_improvement"]
/ max(self.status[x]["nb_usage"], 1)
for x in self.status
]
)
choice = random.choice(
[
x
for x in self.status
if self.status[x]["nb_improvement"]
/ max(self.status[x]["nb_usage"], 1)
== max_improvement
]
)
ch = self.list_constraints_handler[int(choice)]
self.current_iteration += 1
self.last_index_param = choice
self.status[self.last_index_param]["nb_usage"] += 1
logger.debug(f"Status {self.status}")
constraints = ch.adding_constraint_from_results_store(
solver=solver, result_storage=result_storage, **kwargs
)
return constraints
[docs]
def remove_constraints_from_previous_iteration(
self, solver: SolverDO, previous_constraints: Iterable[Any], **kwargs: Any
) -> None:
ch = self.list_constraints_handler[int(self.last_index_param)]
ch.remove_constraints_from_previous_iteration(
solver=solver, previous_constraints=previous_constraints, **kwargs
)