# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
from typing import Any, Optional
from discrete_optimization.coloring.problem import ColoringSolution
from discrete_optimization.coloring.solvers.starting_solution import (
WithStartingSolutionColoringSolver,
)
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
CategoricalHyperparameter,
IntegerHyperparameter,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
)
try:
import pytoulbar2
except ImportError:
toulbar_available = False
else:
toulbar_available = True
import logging
logger = logging.getLogger(__name__)
[docs]
class ToulbarColoringSolver(WithStartingSolutionColoringSolver):
hyperparameters = WithStartingSolutionColoringSolver.hyperparameters + [
CategoricalHyperparameter(
name="value_sequence_chain", choices=[True, False], default=False
),
CategoricalHyperparameter(
name="hard_value_sequence_chain",
choices=[True, False],
default=False,
depends_on=("value_sequence_chain", [True]),
),
IntegerHyperparameter(
name="tolerance_delta_max",
low=0,
high=2,
default=1,
depends_on=("value_sequence_chain", [True]),
),
]
model: Optional[pytoulbar2.CFN] = None
[docs]
def get_range_value(
self, index_node: int, nb_colors_on_subset: int, nb_colors_all: int
):
node_name = self.problem.nodes_name[index_node]
if self.problem.has_constraints_coloring:
nodes = self.problem.constraints_coloring.nodes_fixed()
if node_name in nodes:
value = self.problem.constraints_coloring.color_constraint[node_name]
return range(value, value + 1)
in_subset = self.problem.is_in_subset_index(index_node)
ub_i = nb_colors_on_subset if in_subset else nb_colors_all
return range(ub_i)
[docs]
def init_model(self, **kwargs: Any) -> None:
number_nodes = self.problem.number_of_nodes
index_nodes_name = self.problem.index_nodes_name
kwargs = self.complete_with_default_hyperparameters(kwargs)
nb_colors = kwargs.get("nb_colors", None)
nb_colors_on_subset = kwargs.get("nb_colors_on_subset", nb_colors)
if nb_colors is None:
# Run greedy solver to get an upper bound of the bound.
sol = self.get_starting_solution(**kwargs)
nb_colors = int(self.problem.evaluate(sol)["nb_colors"])
nb_colors_all = self.problem.count_colors_all_index(sol.colors)
nb_colors_on_subset = self.problem.count_colors(sol.colors)
logger.info(f"{nb_colors_on_subset} colors found by the greedy method ")
else:
nb_colors_all = nb_colors
# we don't have to have a very tight bound.
Problem = pytoulbar2.CFN(nb_colors)
Problem.AddVariable("max_color", range(nb_colors_on_subset))
Problem.AddFunction(["max_color"], range(nb_colors_on_subset))
range_map = {}
for i in range(number_nodes):
in_subset = self.problem.is_in_subset_index(i)
range_map[i] = self.get_range_value(
index_node=i,
nb_colors_on_subset=nb_colors_on_subset,
nb_colors_all=nb_colors_all,
)
Problem.AddVariable(f"x_{i}", range_map[i])
if in_subset:
Problem.AddFunction(
[f"x_{i}", "max_color"],
[
10000 if val1 > val2 else 0
for val1 in range_map[i]
for val2 in range(nb_colors_on_subset)
],
) # encode that x_{i}<=max_color.
# Problem.AddLinearConstraint([1, -1], [0, i+1], '>=', 0) # max_color>x_{i} (alternative way ?)
value_sequence_chain = kwargs["value_sequence_chain"]
# Warning : don't use this with special "constraints"
if value_sequence_chain:
hard_value_sequence_chain = kwargs["hard_value_sequence_chain"]
tolerance_delta_max = kwargs["tolerance_delta_max"]
# play with how "fidele" should be the "max_x" variable
for j in range(number_nodes):
Problem.AddVariable(f"max_x_{j}", range(nb_colors_all))
Problem.AddFunction([f"max_x_{0}"], [0] + [1000] * (nb_colors_all - 1))
Problem.AddFunction([f"x_{0}"], [0] + [1000] * (nb_colors - 1))
Problem.AddFunction(
["max_color", f"max_x_{number_nodes-1}"],
[
1000 if val1 != val2 else 0
for val1 in range(nb_colors)
for val2 in range(nb_colors_all)
],
)
for j in range(1, number_nodes):
Problem.AddFunction(
[f"max_x_{j-1}", f"max_x_{j}"],
[
10000 if val1 > val2 or val2 > val1 + tolerance_delta_max else 0
for val1 in range(nb_colors)
for val2 in range(nb_colors)
],
) # Max is increasing but 1 by 1 only.
Problem.AddFunction(
[f"max_x_{j}", f"x_{j}"],
[
10000 if val2 > val1 else 0
for val1 in range(nb_colors)
for val2 in range(nb_colors)
],
)
Problem.AddFunction(
[f"max_x_{j - 1}", f"x_{j}"],
[
10000 if val2 > val1 + tolerance_delta_max else 0
for val1 in range(nb_colors)
for val2 in range(nb_colors)
],
)
if hard_value_sequence_chain:
Problem.AddFunction(
[f"max_x_{j}", f"max_x_{j-1}", f"x_{j}"],
[
0 if val1 == max(val2, val3) else 10000
for val1 in range(nb_colors)
for val2 in range(nb_colors)
for val3 in range(nb_colors)
],
) # x_j <= max_x_{j}
Problem.AddFunction(
[f"max_x_{j-1}", f"x_{j}"],
[
10000 if val2 > val1 + 1 else 0
for val1 in range(nb_colors)
for val2 in range(nb_colors)
],
)
len_edges = len(self.problem.graph.edges)
index = 0
costs_dict = self.default_costs_matrix(
nb_colors_all=nb_colors_all, nb_colors_on_subset=nb_colors_on_subset
)
for e in self.problem.graph.edges:
if index % 100 == 0:
logger.info(f"Nb edges introduced {index} / {len_edges}")
index1 = index_nodes_name[e[0]]
index2 = index_nodes_name[e[1]]
costs_i1_i2 = self.get_costs_matrix(
index1=index1, index2=index2, costs=costs_dict, range_map=range_map
)
Problem.AddFunction([f"x_{index1}", f"x_{index2}"], costs_i1_i2)
index += 1
self.model = Problem
[docs]
def default_costs_matrix(self, nb_colors_all: int, nb_colors_on_subset: int):
costs = [
10000 if val1 == val2 else 0
for val1 in range(nb_colors_all)
for val2 in range(nb_colors_all)
]
costs_dict = {}
if True:
costs_dict = {
"out-out": costs,
"in-out": [
10000 if val1 == val2 else 0
for val1 in range(nb_colors_on_subset)
for val2 in range(nb_colors_all)
],
"out-in": [
10000 if val1 == val2 else 0
for val1 in range(nb_colors_all)
for val2 in range(nb_colors_on_subset)
],
"in-in": [
10000 if val1 == val2 else 0
for val1 in range(nb_colors_on_subset)
for val2 in range(nb_colors_on_subset)
],
}
return costs_dict
[docs]
def get_costs_matrix(
self,
index1: int,
index2: int,
costs: dict[str, list],
range_map: dict[int, Any],
):
in_subset_index1 = self.problem.is_in_subset_index(index1)
in_subset_index2 = self.problem.is_in_subset_index(index2)
key = "out-out"
if in_subset_index1 and in_subset_index2:
key = "in-in"
if not in_subset_index1 and in_subset_index2:
key = "out-in"
if in_subset_index1 and not in_subset_index2:
key = "in-out"
if not in_subset_index1 and not in_subset_index2:
key = "out-out"
if not self.problem.has_constraints_coloring:
return costs[key]
nodes_fixed = self.problem.constraints_coloring.nodes_fixed()
node_index1 = self.problem.index_to_nodes_name[index1]
node_index2 = self.problem.index_to_nodes_name[index2]
if not (node_index1 in nodes_fixed or node_index2 in nodes_fixed):
return costs[key]
else:
return [
10000 if val1 == val2 else 0
for val1 in range_map[index1]
for val2 in range_map[index2]
]
[docs]
def solve(self, time_limit: Optional[int] = 20, **kwargs: Any) -> ResultStorage:
"""
Args:
time_limit: the solve process stops after this time limit (in seconds).
If None, no time limit is applied.
**kwargs:
Returns:
"""
if self.model is None:
self.init_model()
if self.model is None:
raise RuntimeError(
"self.model must not be None after self.init_model()."
)
logger.info("CFN Model init.")
if time_limit is not None:
self.model.CFN.timer(time_limit)
solution = self.model.Solve(showSolutions=1)
logger.info(f"=== Solution === \n {solution}")
if solution is not None:
rcpsp_sol = ColoringSolution(
problem=self.problem,
colors=solution[0][1 : 1 + self.problem.number_of_nodes],
)
fit = self.aggreg_from_sol(rcpsp_sol)
return self.create_result_storage(
[(rcpsp_sol, fit)],
)
else:
return self.create_result_storage()