# Copyright (c) 2024 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import math
import random
from typing import Any, Hashable, Iterable, Optional
from ortools.sat.python.cp_model import Constraint
from discrete_optimization.fjsp.problem import FJobShopProblem, FJobShopSolution
from discrete_optimization.fjsp.solvers.cpsat import CpSatFjspSolver
from discrete_optimization.generic_tools.lns_cp import OrtoolsCpSatConstraintHandler
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
)
[docs]
class NeighborBuilderSubPart:
"""
Cut the schedule in different subpart in the increasing order of the schedule.
"""
def __init__(self, problem: FJobShopProblem, nb_cut_part: int = 10):
self.problem = problem
self.nb_cut_part = nb_cut_part
self.current_sub_part = 0
self.keys = [
(i, j)
for i in range(self.problem.n_jobs)
for j in range(len(self.problem.list_jobs[i].sub_jobs))
]
self.set_keys = set(self.keys)
[docs]
def find_subtasks(
self,
current_solution: FJobShopSolution,
subtasks: Optional[set[Hashable]] = None,
) -> tuple[set[tuple[int, int]], set[tuple[int, int]]]:
nb_job_sub = math.ceil(self.problem.n_all_jobs / self.nb_cut_part)
task_of_interest = sorted(
self.keys, key=lambda x: current_solution.schedule[x[0]][x[1]][0]
)
task_of_interest = task_of_interest[
self.current_sub_part
* nb_job_sub : (self.current_sub_part + 1)
* nb_job_sub
]
if subtasks is None:
subtasks = task_of_interest
else:
subtasks.update(task_of_interest)
self.current_sub_part = (self.current_sub_part + 1) % self.nb_cut_part
return subtasks, self.set_keys.difference(subtasks)
[docs]
class FjspConstraintHandler(OrtoolsCpSatConstraintHandler):
def __init__(self, problem: FJobShopProblem, fraction_segment_to_fix: float = 0.9):
self.problem = problem
self.fraction_segment_to_fix = fraction_segment_to_fix
[docs]
def adding_constraint_from_results_store(
self, solver: CpSatFjspSolver, result_storage: ResultStorage, **kwargs: Any
) -> Iterable[Constraint]:
sol, _ = result_storage.get_best_solution_fit()
sol: FJobShopSolution
keys = random.sample(
list(solver.variables["starts"]),
k=int(self.fraction_segment_to_fix * len(solver.variables["starts"])),
)
constraints = []
for k in keys:
st, end, machine = sol.schedule[k[0]][k[1]]
constraints.append(
solver.cp_model.Add(solver.variables["starts"][k] <= st + 20)
)
constraints.append(
solver.cp_model.Add(solver.variables["starts"][k] >= st - 20)
)
# constraints.append(solver.cp_model.Add(solver.variables["ends"][k] <= end+20))
# constraints.append(solver.cp_model.Add(solver.variables["ends"][k] >= end-20))
solver.cp_model.Minimize(solver.variables["makespan"])
solver.set_warm_start(sol)
return constraints
[docs]
class NeighFjspConstraintHandler(OrtoolsCpSatConstraintHandler):
def __init__(
self, problem: FJobShopProblem, neighbor_builder: NeighborBuilderSubPart
):
self.problem = problem
self.neighbor_builder = neighbor_builder
[docs]
def adding_constraint_from_results_store(
self, solver: CpSatFjspSolver, result_storage: ResultStorage, **kwargs: Any
) -> Iterable[Constraint]:
sol, _ = result_storage[-1]
sol: FJobShopSolution
makespan = self.problem.evaluate(sol)["makespan"]
keys_part, keys = self.neighbor_builder.find_subtasks(current_solution=sol)
constraints = []
for k in keys:
st, end, machine = sol.schedule[k[0]][k[1]]
constraints.append(
solver.cp_model.Add(solver.variables["starts"][k] <= st + 2)
)
constraints.append(
solver.cp_model.Add(solver.variables["starts"][k] >= st - 2)
)
# constraints.append(solver.cp_model.Add(solver.variables["ends"][k] <= end+2))
# constraints.append(solver.cp_model.Add(solver.variables["ends"][k] >= end-2))
for key in solver.variables["ends"]:
constraints.append(
solver.cp_model.Add(solver.variables["ends"][key] <= makespan)
)
# m = solver.cp_model.NewIntVar(lb=0, ub=makespan, name="")
# solver.cp_model.AddMaxEquality(m, [solver.variables["ends"][k] for k in keys_part])
# solver.cp_model.Minimize(m) #+sum([solver.variables["ends"][k] for k in keys_part]))
solver.set_warm_start(sol)
return constraints