# Copyright (c) 2024 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from enum import Enum
from typing import Any, Optional, Union
from ortools.sat.python.cp_model import CpModel, CpSolverSolutionCallback, IntVar
from discrete_optimization.coloring.problem import ColoringSolution
from discrete_optimization.coloring.solvers.starting_solution import (
WithStartingSolutionColoringSolver,
)
from discrete_optimization.generic_tools.do_problem import (
ParamsObjectiveFunction,
Problem,
Solution,
)
from discrete_optimization.generic_tools.do_solver import WarmstartMixin
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
CategoricalHyperparameter,
EnumHyperparameter,
)
from discrete_optimization.generic_tools.ortools_cpsat_tools import OrtoolsCpSatSolver
[docs]
class ModelingCpSat(Enum):
BINARY = 0
INTEGER = 1
[docs]
class CpSatColoringSolver(
OrtoolsCpSatSolver, WithStartingSolutionColoringSolver, WarmstartMixin
):
hyperparameters = [
EnumHyperparameter(
name="modeling", enum=ModelingCpSat, default=ModelingCpSat.INTEGER
),
CategoricalHyperparameter(
name="do_warmstart", choices=[True, False], default=True
),
CategoricalHyperparameter(
name="value_sequence_chain",
choices=[True, False],
default=False,
depends_on=("modeling", [ModelingCpSat.INTEGER]),
),
CategoricalHyperparameter(
name="used_variable",
choices=[True, False],
default=False,
depends_on=("modeling", [ModelingCpSat.INTEGER]),
),
CategoricalHyperparameter(
name="symmetry_on_used",
choices=[True, False],
default=True,
depends_on=("modeling", [ModelingCpSat.INTEGER]),
),
] + WithStartingSolutionColoringSolver.hyperparameters
def __init__(
self,
problem: Problem,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
**kwargs: Any,
):
super().__init__(problem, params_objective_function, **kwargs)
self.modeling: Optional[ModelingCpSat] = None
self.variables: dict[str, Union[list[IntVar], list[dict[int, IntVar]]]] = {}
[docs]
def retrieve_solution(self, cpsolvercb: CpSolverSolutionCallback) -> Solution:
if self.modeling == ModelingCpSat.INTEGER:
return ColoringSolution(
problem=self.problem,
colors=[
cpsolvercb.Value(self.variables["colors"][i])
for i in range(len(self.variables["colors"]))
],
)
if self.modeling == ModelingCpSat.BINARY:
colors = [None for i in range(len(self.variables["colors"]))]
for i in range(len(self.variables["colors"])):
c = next(
(
j
for j in self.variables["colors"][i]
if cpsolvercb.Value(self.variables["colors"][i][j]) == 1
),
None,
)
colors[i] = c
return ColoringSolution(problem=self.problem, colors=colors)
[docs]
def init_model_binary(self, nb_colors: int, **kwargs):
cp_model = CpModel()
allocation_binary = [
{
j: cp_model.NewBoolVar(name=f"allocation_{i}_{j}")
for j in range(nb_colors)
}
for i in range(self.problem.number_of_nodes)
]
for i in range(len(allocation_binary)):
cp_model.AddExactlyOne(
[allocation_binary[i][j] for j in allocation_binary[i]]
)
if self.problem.has_constraints_coloring:
for node in self.problem.constraints_coloring.color_constraint:
ind = self.problem.index_nodes_name[node]
col = self.problem.constraints_coloring.color_constraint[node]
cp_model.Add(allocation_binary[ind][col] == 1)
for c in allocation_binary[ind]:
if c != col:
# Could do it more efficiently
cp_model.Add(allocation_binary[ind][c] == 0)
for edge in self.problem.graph.edges:
ind1 = self.problem.index_nodes_name[edge[0]]
ind2 = self.problem.index_nodes_name[edge[1]]
for team in allocation_binary[ind1]:
if team in allocation_binary[ind2]:
cp_model.AddForbiddenAssignments(
[allocation_binary[ind1][team], allocation_binary[ind2][team]],
[(1, 1)],
)
used = [cp_model.NewBoolVar(f"used_{j}") for j in range(nb_colors)]
if self.problem.use_subset:
indexes_subset = self.problem.index_subset_nodes
else:
indexes_subset = range(len(allocation_binary))
for i in indexes_subset:
for j in allocation_binary[i]:
cp_model.Add(used[j] >= allocation_binary[i][j])
cp_model.Minimize(sum(used))
self.cp_model = cp_model
self.variables["colors"] = allocation_binary
self.variables["used"] = used
[docs]
def init_model_integer(self, nb_colors: int, **kwargs):
used_variable = kwargs["used_variable"]
value_sequence_chain = kwargs["value_sequence_chain"]
symmetry_on_used = kwargs["symmetry_on_used"]
cp_model = CpModel()
variables = [
cp_model.NewIntVar(0, nb_colors - 1, name=f"c_{i}")
for i in range(self.problem.number_of_nodes)
]
for edge in self.problem.graph.edges:
ind_0 = self.problem.index_nodes_name[edge[0]]
ind_1 = self.problem.index_nodes_name[edge[1]]
cp_model.Add(variables[ind_0] != variables[ind_1])
if self.problem.has_constraints_coloring:
for node in self.problem.constraints_coloring.color_constraint:
ind = self.problem.index_nodes_name[node]
cp_model.Add(
variables[ind]
== self.problem.constraints_coloring.color_constraint[node]
)
if value_sequence_chain:
vars = [variables[i] for i in self.problem.index_subset_nodes]
sliding_max = [
cp_model.NewIntVar(0, min(i, nb_colors), name=f"m_{i}")
for i in range(len(vars))
]
cp_model.Add(vars[0] == sliding_max[0])
self.variables["sliding_max"] = sliding_max
for k in range(1, len(vars)):
cp_model.AddMaxEquality(sliding_max[k], [sliding_max[k - 1], vars[k]])
cp_model.Add(sliding_max[k] <= sliding_max[k - 1] + 1)
used = [cp_model.NewBoolVar(name=f"used_{c}") for c in range(nb_colors)]
if used_variable:
def add_indicator(vars, value, presence_value, model):
bool_vars = []
for var in vars:
bool_var = model.NewBoolVar("")
model.Add(var == value).OnlyEnforceIf(bool_var)
model.Add(var != value).OnlyEnforceIf(bool_var.Not())
bool_vars.append(bool_var)
model.AddMaxEquality(presence_value, bool_vars)
for j in range(nb_colors):
if self.problem.use_subset:
indexes = self.problem.index_subset_nodes
vars = [variables[i] for i in indexes]
else:
vars = variables
add_indicator(vars, j, used[j], cp_model)
if symmetry_on_used:
for j in range(nb_colors - 1):
cp_model.Add(used[j] >= used[j + 1])
cp_model.Minimize(sum(used))
else:
nbc = cp_model.NewIntVar(0, nb_colors, name="nbcolors")
cp_model.AddMaxEquality(
nbc, [variables[i] for i in self.problem.index_subset_nodes]
)
cp_model.Minimize(nbc)
self.cp_model = cp_model
self.variables["colors"] = variables
self.variables["used"] = used
[docs]
def set_warm_start(self, solution: ColoringSolution) -> None:
"""Make the solver warm start from the given solution."""
self.cp_model.clear_hints()
if self.modeling == ModelingCpSat.INTEGER:
self.set_warm_start_integer(solution)
if self.modeling == ModelingCpSat.BINARY:
self.set_warm_start_binary(solution)
[docs]
def set_warm_start_integer(self, solution: ColoringSolution):
for i in range(len(solution.colors)):
self.cp_model.AddHint(self.variables["colors"][i], solution.colors[i])
[docs]
def set_warm_start_binary(self, solution: ColoringSolution):
for i in range(len(solution.colors)):
c = solution.colors[i]
for color in self.variables["colors"][i]:
self.cp_model.AddHint(self.variables["colors"][i][color], color == c)
[docs]
def init_model(self, **args: Any) -> None:
args = self.complete_with_default_hyperparameters(args)
modeling = args["modeling"]
do_warmstart = args["do_warmstart"]
assert isinstance(modeling, ModelingCpSat)
if "nb_colors" not in args or do_warmstart:
solution = self.get_starting_solution(**args)
nb_colors = self.problem.count_colors_all_index(solution.colors)
args["nb_colors"] = min(args.get("nb_colors", nb_colors), nb_colors)
if modeling == ModelingCpSat.BINARY:
self.init_model_binary(**args)
if modeling == ModelingCpSat.INTEGER:
self.init_model_integer(**args)
if do_warmstart:
self.set_warm_start(solution=solution)
self.modeling = modeling