# Copyright (c) 2023 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import os
import random
from typing import Any, Optional
import numpy as np
from discrete_optimization.generic_tools.callbacks.callback import (
Callback,
CallbackList,
)
from discrete_optimization.generic_tools.do_solver import (
TrivialSolverFromSolution,
WarmstartMixin,
)
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
FloatHyperparameter,
IntegerHyperparameter,
SubBrick,
SubBrickHyperparameter,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
)
from discrete_optimization.maximum_independent_set.problem import (
MisProblem,
MisSolution,
)
from discrete_optimization.maximum_independent_set.solvers.mis_solver import MisSolver
from discrete_optimization.maximum_independent_set.solvers.networkx import (
NetworkxMisSolver,
)
from discrete_optimization.maximum_independent_set.solvers_map import solve, solvers_map
cur_folder = os.path.abspath(os.path.dirname(__file__))
logger = logging.getLogger(__name__)
subsolvers = [s for s in solvers_map]
[docs]
class DecomposedMisSolver(MisSolver, WarmstartMixin):
"""
This solver is based on the current observation. From a given mis model and one current solution,
if we decide to freeze the decision variable for a subset of items, the remaining problem to solve is also a
mis problem, with fewer nodes.
DecomposedMisSolver is a basic iterative solver that starts from a given solution, then freeze random items,
solve subproblem with a custom root solver, rebuild original solution and repeat the process.
"""
hyperparameters = [
FloatHyperparameter(
name="proportion_to_remove", low=0.0, high=1.0, default=0.7
),
IntegerHyperparameter(name="nb_iteration", low=0, high=int(10e6), default=100),
SubBrickHyperparameter(
name="initial_solver",
choices=subsolvers,
default=SubBrick(cls=NetworkxMisSolver, kwargs={}),
),
SubBrickHyperparameter(
name="root_solver",
choices=subsolvers,
default=SubBrick(cls=NetworkxMisSolver, kwargs={}),
),
]
initial_solution: Optional[MisSolution] = None
"""Initial solution used for warm start."""
[docs]
def rebuild_sol(
self,
sol: MisSolution,
original_mis_model: MisProblem,
original_solution: MisSolution,
indexes_to_remove: set[int] = None,
):
"""
Rebuild a full Mus solution object from a partial solution.
:param sol: solution to a sub-mis problem
:param original_knapsack_problem: original knapsack model to solve
:param original_solution: original base solution
:param indexes_to_remove: indexes of item removed when building the sub-knapsack problem.
:return: A new solution object for the original problem.
"""
list_taken = [
original_solution.chosen[i] for i in range(original_mis_model.number_nodes)
]
for j in range(len(sol.chosen)):
original_index = original_mis_model.nodes_to_index[
sol.problem.index_to_nodes[j]
]
list_taken[original_index] = sol.chosen[j]
solution = MisSolution(problem=original_mis_model, chosen=list_taken)
return solution
[docs]
def set_warm_start(self, solution: MisSolution) -> None:
"""Make the solver warm start from the given solution.
Will be ignored if arg `initial_solver` is set and not None in call to `solve()`.
"""
self.initial_solution = solution
[docs]
def solve(
self, callbacks: Optional[list[Callback]] = None, **kwargs: Any
) -> ResultStorage:
# wrap all callbacks in a single one
callbacks_list = CallbackList(callbacks=callbacks)
# start of solve callback
callbacks_list.on_solve_start(solver=self)
# manage warm start if self.initial_solution is set
if self.initial_solution is not None:
kwargs["initial_solver"] = SubBrick(
cls=TrivialSolverFromSolution,
kwargs=dict(solution=self.initial_solution),
)
kwargs = self.complete_with_default_hyperparameters(kwargs)
initial_solver: SubBrick = kwargs["initial_solver"]
initial_solver_cls: type[MisSolver] = initial_solver.cls
initial_solver_kwargs = initial_solver.kwargs
root_solver: SubBrick = kwargs["root_solver"]
root_solver_cls: type[MisSolver] = root_solver.cls
root_solver_kwargs = root_solver.kwargs
nb_iteration = kwargs["nb_iteration"]
proportion_to_remove = kwargs["proportion_to_remove"]
initial_results = solve(
method_solver=initial_solver_cls,
problem=self.problem,
**initial_solver_kwargs,
)
results_storage = self.create_result_storage(
initial_results.list_solution_fits,
)
logger.info(
f"Initial solution fitness : {results_storage.get_best_solution_fit()[1]}"
)
all_nodes = set(self.problem.nodes_to_index.keys())
for j in range(nb_iteration):
sol, fit = results_storage.get_best_solution_fit()
sol: MisSolution
nb_chosen = sum(sol.chosen)
idx_chosen = list(np.where(sol.chosen)[0])
subpart_chosen = set(
random.sample(
idx_chosen,
int(proportion_to_remove * nb_chosen),
)
)
nodes_to_remove_from_subproblem = set()
for index_node in subpart_chosen:
n = self.problem.index_to_nodes[index_node]
neighbors = self.problem.graph_nx.neighbors(n)
nodes_to_remove_from_subproblem.add(n)
for nn in neighbors:
nodes_to_remove_from_subproblem.add(nn)
subgraph = self.problem.graph_nx.subgraph(
all_nodes.difference(nodes_to_remove_from_subproblem)
)
new_problem = MisProblem(
graph=subgraph, attribute_aggregate=self.problem.attribute_aggregate
)
res = solve(
method_solver=root_solver_cls, problem=new_problem, **root_solver_kwargs
)
best_sol, fit = res.get_best_solution_fit()
if best_sol is not None:
reb_sol = self.rebuild_sol(
sol=best_sol,
original_solution=sol,
original_mis_model=self.problem,
indexes_to_remove=None,
)
fit = self.aggreg_from_sol(reb_sol)
logger.info(f"Iteration {j}/{nb_iteration} : --- Current fitness {fit}")
results_storage.append((reb_sol, fit))
stopping = callbacks_list.on_step_end(
step=j, res=results_storage, solver=self
)
if stopping:
break
# end of solve callback
callbacks_list.on_solve_end(res=results_storage, solver=self)
return results_storage