# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
from collections.abc import Sequence
from copy import deepcopy
from dataclasses import dataclass
from typing import Any, Optional, Union, cast
import numpy as np
from discrete_optimization.generic_tasks_tools.allocation import (
AllocationProblem,
AllocationSolution,
)
from discrete_optimization.generic_tools.do_problem import (
EncodingRegister,
MethodAggregating,
ModeOptim,
ObjectiveDoc,
ObjectiveHandling,
ObjectiveRegister,
Problem,
RobustProblem,
Solution,
TupleFitness,
TypeAttribute,
TypeObjective,
)
[docs]
@dataclass(frozen=True)
class Item:
index: int
value: float
weight: float
def __str__(self) -> str:
return (
"ind: "
+ str(self.index)
+ " weight: "
+ str(self.weight)
+ " value: "
+ str(self.value)
)
Knapsack = bool # unary resource type
KNAPSACK_RESOURCE = True # single resource "knapsack"
[docs]
class KnapsackSolution(AllocationSolution[Item, Knapsack]):
problem: KnapsackProblem
def __init__(
self,
problem: KnapsackProblem,
list_taken: list[int],
value: Optional[float] = None,
weight: Optional[float] = None,
):
self.problem = problem
self.value = value
self.weight = weight
self.list_taken = list_taken
[docs]
def is_allocated(self, task: Item, unary_resource: Knapsack) -> bool:
if unary_resource == KNAPSACK_RESOURCE:
i_item = self.problem.item_to_index_list[task]
return self.list_taken[i_item] > 0
else:
return False
[docs]
def copy(self) -> KnapsackSolution:
return KnapsackSolution(
problem=self.problem,
value=self.value,
weight=self.weight,
list_taken=list(self.list_taken),
)
[docs]
def lazy_copy(self) -> KnapsackSolution:
return KnapsackSolution(
problem=self.problem,
value=self.value,
weight=self.weight,
list_taken=self.list_taken,
)
[docs]
def change_problem(self, new_problem: Problem) -> None:
if not isinstance(new_problem, KnapsackProblem):
raise ValueError("new_problem must a KnapsackModel for a KnapsackSolution.")
self.problem = new_problem
self.list_taken = list(self.list_taken)
def __str__(self) -> str:
s = "Value=" + str(self.value) + "\n"
s += "Weight=" + str(self.weight) + "\n"
s += "Taken : " + str(self.list_taken)
return s
def __hash__(self) -> int:
return hash(str(self))
def __eq__(self, other: object) -> bool:
return (
isinstance(other, KnapsackSolution) and self.list_taken == other.list_taken
)
[docs]
class KnapsackProblem(AllocationProblem[Item, Knapsack]):
def __init__(
self,
list_items: list[Item],
max_capacity: float,
force_recompute_values: bool = False,
):
self.list_items = list_items
self.nb_items = len(list_items)
self.max_capacity = max_capacity
self.index_to_item = {
list_items[i].index: list_items[i] for i in range(self.nb_items)
}
self.index_to_index_list = {
list_items[i].index: i for i in range(self.nb_items)
}
self.item_to_index_list = {item: i for i, item in enumerate(self.list_items)}
self.force_recompute_values = force_recompute_values
@property
def unary_resources_list(self) -> list[Knapsack]:
return [KNAPSACK_RESOURCE]
@property
def tasks_list(self) -> list[Item]:
return self.list_items
[docs]
def get_attribute_register(self) -> EncodingRegister:
dict_register = {
"list_taken": {
"name": "list_taken",
"type": [TypeAttribute.LIST_BOOLEAN, TypeAttribute.LIST_BOOLEAN_KNAP],
"n": self.nb_items,
}
}
return EncodingRegister(dict_register)
[docs]
def get_objective_register(self) -> ObjectiveRegister:
dict_objective = {
"weight_violation": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-100000.0
),
"value": ObjectiveDoc(type=TypeObjective.OBJECTIVE, default_weight=1.0),
}
return ObjectiveRegister(
objective_sense=ModeOptim.MAXIMIZATION,
objective_handling=ObjectiveHandling.AGGREGATE,
dict_objective_to_doc=dict_objective,
)
[docs]
def evaluate_from_encoding(
self, int_vector: list[int], encoding_name: str
) -> dict[str, float]:
if encoding_name == "list_taken":
kp_sol = KnapsackSolution(problem=self, list_taken=int_vector)
else:
raise NotImplementedError("encoding_name must be 'list_taken'")
objectives = self.evaluate(kp_sol)
return objectives
[docs]
def evaluate(self, knapsack_solution: KnapsackSolution) -> dict[str, float]: # type: ignore # avoid isinstance checks for efficiency
if knapsack_solution.value is None or self.force_recompute_values:
val = self.evaluate_value(knapsack_solution)
else:
val = knapsack_solution.value
w_violation = self.evaluate_weight_violation(knapsack_solution)
return {"value": val, "weight_violation": w_violation}
[docs]
def evaluate_value(self, knapsack_solution: KnapsackSolution) -> float:
s = sum(
[
knapsack_solution.list_taken[i] * self.list_items[i].value
for i in range(self.nb_items)
]
)
w = sum(
[
knapsack_solution.list_taken[i] * self.list_items[i].weight
for i in range(self.nb_items)
]
)
knapsack_solution.value = s
knapsack_solution.weight = w
return sum(
[
knapsack_solution.list_taken[i] * self.list_items[i].value
for i in range(self.nb_items)
]
)
[docs]
def evaluate_weight_violation(self, knapsack_solution: KnapsackSolution) -> float:
return max(0.0, knapsack_solution.weight - self.max_capacity) # type: ignore # avoid is None check for efficiency
[docs]
def satisfy(self, knapsack_solution: KnapsackSolution) -> bool: # type: ignore # avoid isinstance checks for efficiency
if knapsack_solution.value is None:
self.evaluate(knapsack_solution)
return knapsack_solution.weight <= self.max_capacity # type: ignore # avoid is None check for efficiency
def __str__(self) -> str:
s = (
"Knapsack model with "
+ str(self.nb_items)
+ " items and capacity "
+ str(self.max_capacity)
+ "\n"
)
s += "\n".join([str(item) for item in self.list_items])
return s
[docs]
def get_dummy_solution(self) -> KnapsackSolution:
kp_sol = KnapsackSolution(problem=self, list_taken=[0] * self.nb_items)
self.evaluate(kp_sol)
return kp_sol
[docs]
def get_solution_type(self) -> type[Solution]:
return KnapsackSolution
[docs]
def create_subknapsack_problem(
knapsack_problem: KnapsackProblem,
solution: KnapsackSolution,
indexes_to_remove: set[int],
indexes_to_keep: set[int] = None,
):
if indexes_to_keep is None:
indexes_to_keep = set(range(knapsack_problem.nb_items)).difference(
indexes_to_remove
)
weight = sum(
solution.list_taken[ind] * knapsack_problem.list_items[ind].weight
for ind in indexes_to_remove
)
return KnapsackProblem(
list_items=[
knapsack_problem.list_items[ind] for ind in sorted(indexes_to_keep)
],
max_capacity=knapsack_problem.max_capacity - weight,
)
[docs]
class MobjKnapsackModel(KnapsackProblem):
[docs]
@staticmethod
def from_knapsack(knapsack_problem: KnapsackProblem) -> "MobjKnapsackModel":
return MobjKnapsackModel(
list_items=knapsack_problem.list_items,
max_capacity=knapsack_problem.max_capacity,
force_recompute_values=knapsack_problem.force_recompute_values,
)
[docs]
def get_objective_register(self) -> ObjectiveRegister:
dict_objective = {
"weight_violation": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-100000.0
),
"heaviest_item": ObjectiveDoc(
type=TypeObjective.OBJECTIVE, default_weight=1.0
),
"weight": ObjectiveDoc(type=TypeObjective.OBJECTIVE, default_weight=-1.0),
"value": ObjectiveDoc(type=TypeObjective.OBJECTIVE, default_weight=1.0),
}
return ObjectiveRegister(
objective_sense=ModeOptim.MAXIMIZATION,
objective_handling=ObjectiveHandling.MULTI_OBJ,
dict_objective_to_doc=dict_objective,
)
[docs]
def evaluate(self, knapsack_solution: KnapsackSolution) -> dict[str, float]: # type: ignore # avoid isinstance checks for efficiency
res = super().evaluate(knapsack_solution)
heaviest = 0.0
weight = 0.0
for i in range(self.nb_items):
if knapsack_solution.list_taken[i] == 1:
heaviest = max(heaviest, self.list_items[i].weight)
weight += self.list_items[i].weight
res["heaviest_item"] = heaviest
res["weight"] = weight
return res
[docs]
def evaluate_mobj_from_dict(self, dict_values: dict[str, float]) -> TupleFitness:
return TupleFitness(
np.array([dict_values["value"], -dict_values["heaviest_item"]]), 2
)
[docs]
def evaluate_mobj(self, solution: KnapsackSolution) -> TupleFitness: # type: ignore # avoid isinstance checks for efficiency
return self.evaluate_mobj_from_dict(self.evaluate(solution))
[docs]
class MultidimensionalKnapsackSolution(Solution):
def __init__(
self,
problem: Union[
MultidimensionalKnapsackProblem,
MultiScenarioMultidimensionalKnapsackProblem,
],
list_taken: list[int],
value: Optional[float] = None,
weights: Optional[list[float]] = None,
):
self.problem = problem
self.value = value
self.weights = weights
self.list_taken = list_taken
[docs]
def copy(self) -> MultidimensionalKnapsackSolution:
return MultidimensionalKnapsackSolution(
problem=self.problem,
value=self.value,
weights=self.weights,
list_taken=list(self.list_taken),
)
[docs]
def lazy_copy(self) -> MultidimensionalKnapsackSolution:
return MultidimensionalKnapsackSolution(
problem=self.problem,
value=self.value,
weights=self.weights,
list_taken=self.list_taken,
)
[docs]
def change_problem(self, new_problem: Problem) -> None:
if not isinstance(new_problem, MultidimensionalKnapsackProblem):
raise ValueError(
"new_problem must a MultidimensionalKnapsack for a KnapsackSolutionMultidimensional."
)
self.problem = new_problem
self.list_taken = list(self.list_taken)
def __str__(self) -> str:
s = "Value=" + str(self.value) + "\n"
s += "Weights=" + str(self.weights) + "\n"
s += "Taken : " + str(self.list_taken)
return s
def __hash__(self) -> int:
return hash(str(self))
def __eq__(self, other: object) -> bool:
return (
isinstance(other, MultidimensionalKnapsackSolution)
and self.list_taken == other.list_taken
)
[docs]
@dataclass(frozen=True)
class ItemMultidimensional:
index: int
value: float
weights: list[float]
def __str__(self) -> str:
return (
"ind: "
+ str(self.index)
+ " weights: "
+ str(self.weights)
+ " value: "
+ str(self.value)
)
[docs]
class MultidimensionalKnapsackProblem(Problem):
def __init__(
self,
list_items: list[ItemMultidimensional],
max_capacities: list[float],
force_recompute_values: bool = False,
):
self.list_items = list_items
self.nb_items = len(list_items)
self.max_capacities = max_capacities
self.index_to_item = {
list_items[i].index: list_items[i] for i in range(self.nb_items)
}
self.index_to_index_list = {
list_items[i].index: i for i in range(self.nb_items)
}
self.force_recompute_values = force_recompute_values
[docs]
def get_attribute_register(self) -> EncodingRegister:
dict_register = {
"list_taken": {
"name": "list_taken",
"type": [TypeAttribute.LIST_BOOLEAN],
"n": self.nb_items,
}
}
return EncodingRegister(dict_register)
[docs]
def get_objective_register(self) -> ObjectiveRegister:
dict_objective = {
"weight_violation": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-100000.0
),
"value": ObjectiveDoc(type=TypeObjective.OBJECTIVE, default_weight=1.0),
}
return ObjectiveRegister(
objective_sense=ModeOptim.MAXIMIZATION,
objective_handling=ObjectiveHandling.AGGREGATE,
dict_objective_to_doc=dict_objective,
)
[docs]
def evaluate_from_encoding(
self, int_vector: list[int], encoding_name: str
) -> dict[str, float]:
if encoding_name == "list_taken":
kp_sol = MultidimensionalKnapsackSolution(
problem=self, list_taken=int_vector
)
elif encoding_name == "custom":
kwargs: dict[str, Any] = {encoding_name: int_vector}
kp_sol = MultidimensionalKnapsackSolution(problem=self, **kwargs)
else:
raise NotImplementedError("encoding_name must be 'list_taken' or 'custom'")
objectives = self.evaluate(kp_sol)
return objectives
[docs]
def evaluate(
self, knapsack_solution: MultidimensionalKnapsackSolution
) -> dict[str, float]: # type: ignore # avoid isinstance checks for efficiency
if knapsack_solution.value is None or self.force_recompute_values:
val = self.evaluate_value(knapsack_solution)
else:
val = knapsack_solution.value
w_violation = self.evaluate_weight_violation(knapsack_solution)
return {"value": val, "weight_violation": w_violation}
[docs]
def evaluate_value(
self, knapsack_solution: MultidimensionalKnapsackSolution
) -> float:
s = sum(
[
knapsack_solution.list_taken[i] * self.list_items[i].value
for i in range(self.nb_items)
]
)
w = [
sum(
[
knapsack_solution.list_taken[i] * self.list_items[i].weights[j]
for i in range(self.nb_items)
]
)
for j in range(len(self.max_capacities))
]
knapsack_solution.value = s
knapsack_solution.weights = w
return s
[docs]
def evaluate_weight_violation(
self, knapsack_solution: MultidimensionalKnapsackSolution
) -> float:
if knapsack_solution.weights is None:
raise RuntimeError(
"knapsack_solution.weights should not be None when calling evaluate_weight_violation."
)
return sum(
[
max(0.0, knapsack_solution.weights[j] - self.max_capacities[j])
for j in range(len(self.max_capacities))
]
)
[docs]
def satisfy(self, knapsack_solution: MultidimensionalKnapsackSolution) -> bool: # type: ignore # avoid isinstance checks for efficiency
if knapsack_solution.value is None or knapsack_solution.weights is None:
self.evaluate(knapsack_solution)
if knapsack_solution.value is None or knapsack_solution.weights is None:
raise RuntimeError(
"knapsack_solution.value and knapsack_solution.weights should not be None now."
)
return all(
knapsack_solution.weights[j] <= self.max_capacities[j]
for j in range(len(self.max_capacities))
)
def __str__(self) -> str:
s = (
"Knapsack model with "
+ str(self.nb_items)
+ " items and capacity "
+ str(self.max_capacities)
+ "\n"
)
s += "\n".join([str(item) for item in self.list_items])
return s
[docs]
def get_dummy_solution(self) -> MultidimensionalKnapsackSolution:
kp_sol = MultidimensionalKnapsackSolution(
problem=self, list_taken=[0] * self.nb_items
)
self.evaluate(kp_sol)
return kp_sol
[docs]
def get_solution_type(self) -> type[Solution]:
return MultidimensionalKnapsackSolution
[docs]
def copy(self) -> MultidimensionalKnapsackProblem:
return MultidimensionalKnapsackProblem(
list_items=[deepcopy(x) for x in self.list_items],
max_capacities=list(self.max_capacities),
force_recompute_values=self.force_recompute_values,
)
[docs]
class MultiScenarioMultidimensionalKnapsackProblem(RobustProblem):
list_problem: Sequence[MultidimensionalKnapsackProblem]
def __init__(
self,
list_problem: Sequence[MultidimensionalKnapsackProblem],
method_aggregating: MethodAggregating,
):
super().__init__(list_problem, method_aggregating)
[docs]
def get_dummy_solution(self) -> MultidimensionalKnapsackSolution:
return cast(
MultidimensionalKnapsackProblem, self.list_problem[0]
).get_dummy_solution()
[docs]
def from_kp_to_multi(
knapsack_problem: KnapsackProblem,
) -> MultidimensionalKnapsackProblem:
return MultidimensionalKnapsackProblem(
list_items=[
ItemMultidimensional(index=x.index, value=x.value, weights=[x.weight])
for x in knapsack_problem.list_items
],
max_capacities=[knapsack_problem.max_capacity],
)
[docs]
def create_noised_scenario(
problem: MultidimensionalKnapsackProblem, nb_scenarios: int = 20
) -> list[MultidimensionalKnapsackProblem]:
scenarios = [problem.copy() for i in range(nb_scenarios)]
for p in scenarios:
litem = []
for item in p.list_items:
litem += [
ItemMultidimensional(
index=item.index,
value=np.random.randint(
max(0, int(item.value * 0.9)), int(item.value * 1.1)
),
weights=item.weights,
)
]
p.list_items = litem
return scenarios