Source code for discrete_optimization.rcpsp.solvers.optal

#  Copyright (c) 2025 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import datetime
import json
import os
from typing import Any, Optional

from discrete_optimization.generic_tools.cp_tools import CpSolver, ParametersCp
from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Solution,
)
from discrete_optimization.generic_tools.hub_solver.optal.generic_optal import (
    OptalSolver,
)
from discrete_optimization.generic_tools.hub_solver.optal.model_collections import (
    DoProblemEnum,
    problem_to_script_path,
)
from discrete_optimization.rcpsp.problem import RcpspProblem, RcpspSolution

script_single_mode = problem_to_script_path[DoProblemEnum.RCPSP]
script_multi_mode = problem_to_script_path[DoProblemEnum.MRCPSP]


[docs] def dump_to_json(problem: RcpspProblem): mode_details_str_keys = { str(task): {str(mode): details for mode, details in modes.items()} for task, modes in problem.mode_details.items() } successors_str_keys = { str(task): [str(s) for s in succ] for task, succ in problem.successors.items() } problem_data = { "nbJobs": problem.n_jobs, "nbResources": len(problem.resources_list), "sourceTask": str(problem.source_task), "sinkTask": str(problem.sink_task), "tasksList": [str(t) for t in problem.tasks_list], "resources": problem.resources, "nonRenewableResources": problem.non_renewable_resources, "modeDetails": mode_details_str_keys, "successors": successors_str_keys, "horizon": problem.horizon, } return problem_data
[docs] class OptalRcpspSolver(OptalSolver): problem: RcpspProblem def __init__( self, problem: RcpspProblem, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs: Any, ): super().__init__(problem, params_objective_function, **kwargs) if self.problem.is_multimode: self._script_model = script_multi_mode else: self._script_model = script_single_mode
[docs] def init_model(self, **args: Any) -> None: output = dump_to_json(self.problem) d = datetime.datetime.now().timestamp() file_input_path = os.path.join(self.temp_directory, f"tmp-{d}.json") logs_path = os.path.join(self.temp_directory, f"tmp-stats-{d}.json") result_path = os.path.join(self.temp_directory, f"solution-{d}.json") self._logs_path = logs_path self._result_path = result_path with open(file_input_path, "w") as f: json.dump(output, f, indent=4) self._file_input = file_input_path
[docs] def build_command( self, parameters_cp: Optional[ParametersCp] = None, time_limit: int = 10, **args: Any, ): command = super().build_command( parameters_cp=parameters_cp, time_limit=time_limit, **args ) command += ["--output-json", self._result_path] return command
[docs] def retrieve_current_solution(self, dict_results: dict) -> Solution: start_times = dict_results["startTimes"] end_times = dict_results["endTimes"] rcpsp_schedule = {} modes_dict = {} tasks = self.problem.get_tasks_list() str_to_task = {str(t): t for t in self.problem.tasks_list} for key in start_times: rcpsp_schedule[str_to_task[key]] = { "start_time": start_times[key], "end_time": end_times[key], } if "modes" in dict_results: modes_dict[str_to_task[key]] = int(dict_results["modes"][key]) else: modes_dict[str_to_task[key]] = 1 rcpsp_schedule[self.problem.source_task] = {"start_time": 0, "end_time": 0} max_time = max(end_times.values()) rcpsp_schedule[self.problem.sink_task] = { "start_time": max_time, "end_time": max_time, } return RcpspSolution( problem=self.problem, rcpsp_schedule=rcpsp_schedule, rcpsp_modes=[modes_dict[t] for t in self.problem.tasks_list_non_dummy], )