Source code for discrete_optimization.knapsack.solvers.dp

#  Copyright (c) 2024 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import logging
import time
from typing import Any, Optional

import didppy as dp
import numpy as np

from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Solution,
)
from discrete_optimization.generic_tools.do_solver import WarmstartMixin
from discrete_optimization.generic_tools.dyn_prog_tools import DpSolver
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    CategoricalHyperparameter,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.knapsack.problem import KnapsackProblem, KnapsackSolution
from discrete_optimization.knapsack.solvers import KnapsackSolver

logger = logging.getLogger(__name__)


[docs] class DpKnapsackSolver(KnapsackSolver, DpSolver, WarmstartMixin): hyperparameters = DpSolver.hyperparameters + [ CategoricalHyperparameter( name="float_cost", choices=[True, False], default=False ), CategoricalHyperparameter( name="dual_bound", choices=[True, False], default=True, depends_on=("float_cost", True), ), ] model: dp.Model transitions: dict
[docs] def init_model(self, **kwargs): kwargs = self.complete_with_default_hyperparameters(kwargs) if kwargs["float_cost"]: self.init_model_float(**kwargs) else: self.init_model_int(**kwargs)
[docs] def init_model_int(self, **kwargs): """Adapted from https://didppy.readthedocs.io/en/stable/quickstart.html""" n = self.problem.nb_items weights = [self.problem.list_items[i].weight for i in range(n)] profits = [self.problem.list_items[i].value for i in range(n)] capacity = self.problem.max_capacity model = dp.Model(maximize=True, float_cost=False) item = model.add_object_type(number=self.problem.nb_items) r = model.add_int_var(target=capacity) i = model.add_element_var(object_type=item, target=0) w = model.add_int_table(weights) p = model.add_int_table(profits) pack = dp.Transition( name="pack", cost=p[i] + dp.IntExpr.state_cost(), effects=[(r, r - w[i]), (i, i + 1)], preconditions=[i < n, r >= w[i]], ) model.add_transition(pack) self.transitions = {"pack": pack} ignore = dp.Transition( name="ignore", cost=dp.IntExpr.state_cost(), effects=[(i, i + 1)], preconditions=[i < n], ) self.transitions["ignore"] = ignore model.add_transition(ignore) model.add_base_case([i == n]) self.model = model
[docs] def init_model_float(self, **kwargs): n = self.problem.nb_items weights = [self.problem.list_items[i].weight for i in range(n)] profits = [self.problem.list_items[i].value for i in range(n)] capacity = self.problem.max_capacity model = dp.Model(maximize=True, float_cost=True) item = model.add_object_type(number=self.problem.nb_items) r = model.add_float_var(target=capacity) i = model.add_element_var(object_type=item, target=0) w = model.add_float_table(weights) p = model.add_float_table(profits) undone = model.add_set_var( object_type=item, target=range(self.problem.nb_items) ) pack = dp.Transition( name="pack", cost=p[i] + dp.FloatExpr.state_cost(), effects=[(r, r - w[i]), (i, i + 1), (undone, undone.remove(i))], preconditions=[i < n, r >= w[i]], ) model.add_transition(pack) self.transitions = {"pack": pack} ignore = dp.Transition( name="ignore", cost=dp.FloatExpr.state_cost(), effects=[(i, i + 1), (undone, undone.remove(i))], preconditions=[i < n], ) self.transitions["ignore"] = ignore model.add_transition(ignore) model.add_base_case([i == n]) if kwargs["dual_bound"]: value_per_weight = [ profits[i] / weights[i] for i in range(self.problem.nb_items) ] value_pw = model.add_float_table(value_per_weight) model.add_dual_bound(p[undone]) model.add_dual_bound( (undone.is_empty()).if_then_else(0, value_pw.max(undone) * r) ) self.model = model
[docs] def retrieve_solution(self, sol: dp.Solution) -> Solution: taken = [0 for _ in range(self.problem.nb_items)] for i, t in enumerate(sol.transitions): if t.name == "pack": taken[i] = 1 logger.debug(f"pack {i}") return KnapsackSolution(problem=self.problem, list_taken=taken)
[docs] def set_warm_start(self, solution: KnapsackSolution) -> None: transition = [] for i in range(len(solution.list_taken)): if solution.list_taken[i] == 1: transition.append(self.transitions["pack"]) else: transition.append(self.transitions["ignore"]) self.initial_solution = transition
[docs] class ExactDpKnapsackSolver(KnapsackSolver): hyperparameters = [ CategoricalHyperparameter( name="greedy_start", default=False, choices=[True, False] ), ] def __init__( self, problem: KnapsackProblem, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs, ): super().__init__( problem=problem, params_objective_function=params_objective_function ) self.nb_items = self.problem.nb_items capacity = int(self.problem.max_capacity) if capacity != self.problem.max_capacity: logger.warning( "knapsack_problem.max_capacity should be an integer for dynamic programming. " "Coercing it to an integer for the solver." ) self.capacity: int = capacity self.table = np.zeros((self.nb_items + 1, self.capacity + 1))
[docs] def solve(self, **kwargs: Any) -> ResultStorage: kwargs = self.complete_with_default_hyperparameters(kwargs) start_by_most_promising = kwargs["greedy_start"] max_items = kwargs.get("max_items", self.problem.nb_items + 1) max_items = min(self.problem.nb_items + 1, max_items) max_time_seconds = kwargs.get("time_limit", None) if max_time_seconds is None: do_time = False else: do_time = True if start_by_most_promising: promising = sorted( [ l for l in self.problem.list_items if l.weight <= self.problem.max_capacity ], key=lambda x: x.value / x.weight, reverse=True, ) index_promising = [l.index for l in promising] else: index_promising = [l.index for l in self.problem.list_items] if do_time: t_start = time.time() cur_indexes = (0, 0) for nb_item in range(1, len(index_promising) + 1): if do_time: if time.time() - t_start > max_time_seconds: break index_item = self.problem.index_to_index_list[index_promising[nb_item - 1]] for capacity in range(self.capacity + 1): weight = int( self.problem.list_items[index_item].weight ) # weight should be an integer for this algo value = self.problem.list_items[index_item].value if weight > capacity: self.table[nb_item, capacity] = self.table[nb_item - 1, capacity] continue self.table[nb_item, capacity] = max( self.table[nb_item - 1, capacity], self.table[nb_item - 1, capacity - weight] + value, ) if capacity == self.capacity: cur_indexes = (nb_item, capacity) logger.debug(f"Cur obj : {self.table[nb_item, capacity]}") taken = [0] * self.nb_items weight = 0 value = 0 cur_value = self.table[cur_indexes[0], cur_indexes[1]] while cur_indexes[0] != 0: value_left = self.table[cur_indexes[0] - 1, cur_indexes[1]] if cur_value != value_left: index_item = self.problem.index_to_index_list[ index_promising[cur_indexes[0] - 1] ] taken[index_item] = 1 value += self.problem.list_items[index_item].value weight += int(self.problem.list_items[index_item].weight) cur_indexes = ( cur_indexes[0] - 1, cur_indexes[1] - int(self.problem.list_items[index_item].weight), ) else: cur_indexes = (cur_indexes[0] - 1, cur_indexes[1]) cur_value = self.table[cur_indexes[0], cur_indexes[1]] sol = KnapsackSolution( problem=self.problem, value=value, weight=weight, list_taken=taken ) fit = self.aggreg_from_sol(sol) return self.create_result_storage( [(sol, fit)], )