# Copyright (c) 2024 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# Job shop model, this was initially implemented in a course material
# here https://github.com/erachelson/seq_dec_mak/blob/main/scheduling_newcourse/correction/nb2_jobshopsolver.py
from __future__ import annotations
from discrete_optimization.generic_tasks_tools.precedence import PrecedenceProblem
from discrete_optimization.generic_tasks_tools.scheduling import (
SchedulingProblem,
SchedulingSolution,
)
from discrete_optimization.generic_tools.do_problem import (
EncodingRegister,
ModeOptim,
ObjectiveDoc,
ObjectiveHandling,
ObjectiveRegister,
Solution,
TypeObjective,
)
Task = tuple[int, int]
"""Task representation: (job index, subjob index)."""
[docs]
class JobShopSolution(SchedulingSolution[Task]):
problem: JobShopProblem
def __init__(self, problem: JobShopProblem, schedule: list[list[tuple[int, int]]]):
# For each job and sub-job, start and end time given as tuple of int.
self.problem = problem
self.schedule = schedule
[docs]
def copy(self) -> JobShopSolution:
return JobShopSolution(problem=self.problem, schedule=self.schedule)
[docs]
def change_problem(self, new_problem: JobShopProblem) -> None:
self.problem = new_problem
[docs]
def get_end_time(self, task: Task) -> int:
j, k = task
return self.schedule[j][k][1]
[docs]
def get_start_time(self, task: Task) -> int:
j, k = task
return self.schedule[j][k][0]
[docs]
def get_max_end_time(self) -> int:
return max(job[-1][1] for job in self.schedule)
[docs]
class Subjob:
machine_id: int
processing_time: int
def __init__(self, machine_id: int, processing_time: int):
"""Define data of a given subjob"""
self.machine_id = machine_id
self.processing_time = processing_time
[docs]
class JobShopProblem(SchedulingProblem[Task], PrecedenceProblem[Task]):
n_jobs: int
n_machines: int
list_jobs: list[list[Subjob]]
def __init__(
self,
list_jobs: list[list[Subjob]],
n_jobs: int = None,
n_machines: int = None,
horizon: int = None,
):
self.n_jobs = n_jobs
self.n_machines = n_machines
self.list_jobs = list_jobs
if self.n_jobs is None:
self.n_jobs = len(list_jobs)
if self.n_machines is None:
self.n_machines = len(
set([y.machine_id for x in self.list_jobs for y in x])
)
self.n_all_jobs = sum(len(subjob) for subjob in self.list_jobs)
# Store for each machine the list of sub-job given as (index_job, index_sub-job)
self.job_per_machines = {i: [] for i in range(self.n_machines)}
for k in range(self.n_jobs):
for sub_k in range(len(list_jobs[k])):
self.job_per_machines[list_jobs[k][sub_k].machine_id] += [(k, sub_k)]
self.horizon = horizon
if self.horizon is None:
self.horizon = sum(
sum(subjob.processing_time for subjob in job) for job in self.list_jobs
)
@property
def tasks_list(self) -> list[Task]:
return [(j, k) for j, job in enumerate(self.list_jobs) for k in range(len(job))]
[docs]
def get_precedence_constraints(self) -> dict[Task, list[Task]]:
return {
(j, k): [(j, k + 1)] if k + 1 < len(job) else []
for j, job in enumerate(self.list_jobs)
for k in range(len(job))
}
[docs]
def get_makespan_upper_bound(self) -> int:
return self.horizon
[docs]
def get_last_tasks(self) -> list[Task]:
return [(j, len(job) - 1) for j, job in enumerate(self.list_jobs)]
[docs]
def evaluate(self, variable: JobShopSolution) -> dict[str, float]:
return {"makespan": variable.get_max_end_time()}
[docs]
def satisfy(self, variable: JobShopSolution) -> bool:
for m in self.job_per_machines:
sorted_ = sorted(
[variable.schedule[x[0]][x[1]] for x in self.job_per_machines[m]],
key=lambda y: y[0],
)
for i in range(1, len(sorted_)):
if sorted_[i][0] < sorted_[i - 1][1]:
return False
for job in range(self.n_jobs):
for s_j in range(1, len(variable.schedule[job])):
if variable.schedule[job][s_j][0] < variable.schedule[job][s_j - 1][1]:
return False
return True
[docs]
def get_attribute_register(self) -> EncodingRegister:
return EncodingRegister(dict_attribute_to_type={})
[docs]
def get_solution_type(self) -> type[Solution]:
return JobShopSolution
[docs]
def get_objective_register(self) -> ObjectiveRegister:
return ObjectiveRegister(
dict_objective_to_doc={
"makespan": ObjectiveDoc(type=TypeObjective.OBJECTIVE, default_weight=1)
},
objective_sense=ModeOptim.MINIMIZATION,
objective_handling=ObjectiveHandling.AGGREGATE,
)