Source code for discrete_optimization.jsp.problem

#  Copyright (c) 2024 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
#  Job shop model, this was initially implemented in a course material
#  here https://github.com/erachelson/seq_dec_mak/blob/main/scheduling_newcourse/correction/nb2_jobshopsolver.py
from __future__ import annotations

from discrete_optimization.generic_tasks_tools.precedence import PrecedenceProblem
from discrete_optimization.generic_tasks_tools.scheduling import (
    SchedulingProblem,
    SchedulingSolution,
)
from discrete_optimization.generic_tools.do_problem import (
    EncodingRegister,
    ModeOptim,
    ObjectiveDoc,
    ObjectiveHandling,
    ObjectiveRegister,
    Solution,
    TypeObjective,
)

Task = tuple[int, int]
"""Task representation: (job index, subjob index)."""


[docs] class JobShopSolution(SchedulingSolution[Task]): problem: JobShopProblem def __init__(self, problem: JobShopProblem, schedule: list[list[tuple[int, int]]]): # For each job and sub-job, start and end time given as tuple of int. self.problem = problem self.schedule = schedule
[docs] def copy(self) -> JobShopSolution: return JobShopSolution(problem=self.problem, schedule=self.schedule)
[docs] def change_problem(self, new_problem: JobShopProblem) -> None: self.problem = new_problem
[docs] def get_end_time(self, task: Task) -> int: j, k = task return self.schedule[j][k][1]
[docs] def get_start_time(self, task: Task) -> int: j, k = task return self.schedule[j][k][0]
[docs] def get_max_end_time(self) -> int: return max(job[-1][1] for job in self.schedule)
[docs] class Subjob: machine_id: int processing_time: int def __init__(self, machine_id: int, processing_time: int): """Define data of a given subjob""" self.machine_id = machine_id self.processing_time = processing_time
[docs] class JobShopProblem(SchedulingProblem[Task], PrecedenceProblem[Task]): n_jobs: int n_machines: int list_jobs: list[list[Subjob]] def __init__( self, list_jobs: list[list[Subjob]], n_jobs: int = None, n_machines: int = None, horizon: int = None, ): self.n_jobs = n_jobs self.n_machines = n_machines self.list_jobs = list_jobs if self.n_jobs is None: self.n_jobs = len(list_jobs) if self.n_machines is None: self.n_machines = len( set([y.machine_id for x in self.list_jobs for y in x]) ) self.n_all_jobs = sum(len(subjob) for subjob in self.list_jobs) # Store for each machine the list of sub-job given as (index_job, index_sub-job) self.job_per_machines = {i: [] for i in range(self.n_machines)} for k in range(self.n_jobs): for sub_k in range(len(list_jobs[k])): self.job_per_machines[list_jobs[k][sub_k].machine_id] += [(k, sub_k)] self.horizon = horizon if self.horizon is None: self.horizon = sum( sum(subjob.processing_time for subjob in job) for job in self.list_jobs ) @property def tasks_list(self) -> list[Task]: return [(j, k) for j, job in enumerate(self.list_jobs) for k in range(len(job))]
[docs] def get_precedence_constraints(self) -> dict[Task, list[Task]]: return { (j, k): [(j, k + 1)] if k + 1 < len(job) else [] for j, job in enumerate(self.list_jobs) for k in range(len(job)) }
[docs] def get_makespan_upper_bound(self) -> int: return self.horizon
[docs] def get_last_tasks(self) -> list[Task]: return [(j, len(job) - 1) for j, job in enumerate(self.list_jobs)]
[docs] def evaluate(self, variable: JobShopSolution) -> dict[str, float]: return {"makespan": variable.get_max_end_time()}
[docs] def satisfy(self, variable: JobShopSolution) -> bool: for m in self.job_per_machines: sorted_ = sorted( [variable.schedule[x[0]][x[1]] for x in self.job_per_machines[m]], key=lambda y: y[0], ) for i in range(1, len(sorted_)): if sorted_[i][0] < sorted_[i - 1][1]: return False for job in range(self.n_jobs): for s_j in range(1, len(variable.schedule[job])): if variable.schedule[job][s_j][0] < variable.schedule[job][s_j - 1][1]: return False return True
[docs] def get_attribute_register(self) -> EncodingRegister: return EncodingRegister(dict_attribute_to_type={})
[docs] def get_solution_type(self) -> type[Solution]: return JobShopSolution
[docs] def get_objective_register(self) -> ObjectiveRegister: return ObjectiveRegister( dict_objective_to_doc={ "makespan": ObjectiveDoc(type=TypeObjective.OBJECTIVE, default_weight=1) }, objective_sense=ModeOptim.MINIMIZATION, objective_handling=ObjectiveHandling.AGGREGATE, )