Source code for discrete_optimization.fjsp.solvers.cpsat_auto

#  Copyright (c) 2024 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
from typing import Any

from discrete_optimization.fjsp.problem import (
    CumulativeResource,
    FJobShopProblem,
    FJobShopSolution,
    Task,
)
from discrete_optimization.generic_tasks_tools.allocation import (
    NoUnaryResource,
    UnaryResource,
)
from discrete_optimization.generic_tasks_tools.non_renewable_resource import (
    NoNonRenewableResource,
)
from discrete_optimization.generic_tasks_tools.solvers.cpsat.auto import (
    GenericSchedulingAutoCpSatSolver,
    TemporarySolution,
)
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    CategoricalHyperparameter,
)

logger = logging.getLogger(__name__)


[docs] class CpSatAutoFjspSolver( GenericSchedulingAutoCpSatSolver[ Task, NoUnaryResource, CumulativeResource, NoNonRenewableResource ], ): hyperparameters = [ CategoricalHyperparameter( name="duplicate_temporal_var", choices=[True, False], default=False ), CategoricalHyperparameter( name="add_cumulative_constraint", choices=[True, False], default=False ), ] problem: FJobShopProblem
[docs] def init_model(self, **kwargs: Any) -> None: # optional parameters kwargs = self.complete_with_default_hyperparameters(kwargs) self._max_time = kwargs.get( "max_time", self.problem.get_makespan_upper_bound() ) # update the upper bound for makespan self.duplicate_start_var_per_mode = kwargs["duplicate_temporal_var"] # whether to add cumulative constraint on top of no_overlap constraint self.use_cumulative_for_capa_1 = bool(kwargs["add_cumulative_constraint"]) # use cpm to compute start/end bounds self.use_cpm_for_task_bounds = True super().init_model(**kwargs)
[docs] def get_makespan_upper_bound(self) -> int: return self._max_time
[docs] def convert_task_variables_to_solution( self, temp_sol: TemporarySolution[Task, UnaryResource] ) -> FJobShopSolution: schedule = [ [ ( (task_var := temp_sol.task_variables[j, k]).start, task_var.end, self.problem.mode2machine[j, k][task_var.mode], task_var.mode, ) for k, sub_job in enumerate(job.sub_jobs) ] for j, job in enumerate(self.problem.list_jobs) ] return FJobShopSolution(problem=self.problem, schedule=schedule)