Source code for discrete_optimization.fjsp.problem

#  Copyright (c) 2024 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

from __future__ import annotations

import logging
from dataclasses import dataclass

from discrete_optimization.generic_tasks_tools.allocation import (
    NoUnaryResource,
    WithoutAllocationProblem,
    WithoutAllocationSolution,
)
from discrete_optimization.generic_tasks_tools.calendar_resource import (
    WithoutCalendarResourceProblem,
    WithoutCalendarResourceSolution,
)
from discrete_optimization.generic_tasks_tools.cumulative_resource import (
    NoCumulativeResource,
    WithoutCumulativeResourceProblem,
    WithoutCumulativeResourceSolution,
)
from discrete_optimization.generic_tasks_tools.generic_scheduling import (
    GenericSchedulingProblem,
    GenericSchedulingSolution,
)
from discrete_optimization.generic_tasks_tools.non_renewable_resource import (
    NoNonRenewableResource,
    WithoutNonRenewableResourceProblem,
    WithoutNonRenewableResourceSolution,
)
from discrete_optimization.generic_tools.do_problem import (
    ModeOptim,
    ObjectiveDoc,
    ObjectiveHandling,
    ObjectiveRegister,
    Solution,
    TypeObjective,
)
from discrete_optimization.jsp.problem import Subjob, Task

logger = logging.getLogger(__name__)


[docs] class FJobShopSolution( GenericSchedulingSolution[ Task, NoUnaryResource, NoCumulativeResource, NoNonRenewableResource ], WithoutCumulativeResourceSolution[Task, NoUnaryResource], WithoutNonRenewableResourceSolution[Task], WithoutAllocationSolution[Task], WithoutCalendarResourceSolution[Task], ): problem: FJobShopProblem def __init__( self, problem: FJobShopProblem, schedule: list[list[tuple[int, int, int, int]]] ): # For each job and sub-job, start, end time, machine id, and option choice given as tuple of int. super().__init__(problem=problem) self.schedule = schedule
[docs] def copy(self) -> FJobShopSolution: return FJobShopSolution(problem=self.problem, schedule=self.schedule)
[docs] def get_end_time(self, task: Task) -> int: j, k = task return self.schedule[j][k][1]
[docs] def get_start_time(self, task: Task) -> int: j, k = task return self.schedule[j][k][0]
[docs] def get_machine(self, task: Task) -> int: j, k = task return self.schedule[j][k][2]
[docs] def get_mode(self, task: Task) -> int: """Get 'mode' of given task, aka chosen machine.""" j, k = task return self.schedule[j][k][-1]
SubjobOptions = list[Subjob]
[docs] @dataclass class Job: job_id: int sub_jobs: list[SubjobOptions]
[docs] class FJobShopProblem( GenericSchedulingProblem[ Task, NoUnaryResource, NoCumulativeResource, NoNonRenewableResource ], WithoutCumulativeResourceProblem[Task, NoUnaryResource], WithoutNonRenewableResourceProblem[Task], WithoutAllocationProblem[Task], WithoutCalendarResourceProblem[Task], ): n_jobs: int n_machines: int list_jobs: list[Job] def __init__( self, list_jobs: list[Job], n_jobs: int = None, n_machines: int = None, horizon: int = None, ): self.list_jobs = list_jobs self.n_jobs = n_jobs self.n_machines = n_machines self.list_jobs = list_jobs if self.n_jobs is None: self.n_jobs = len(list_jobs) if self.n_machines is None: self.n_machines = len( set( [ option.machine_id for job in self.list_jobs for options in job.sub_jobs for option in options ] ) ) self.n_all_jobs = sum(len(subjob.sub_jobs) for subjob in self.list_jobs) self.job_per_machines = {i: [] for i in range(self.n_machines)} for k in range(self.n_jobs): for sub_k in range(len(list_jobs[k].sub_jobs)): for option in range(len(list_jobs[k].sub_jobs[sub_k])): self.job_per_machines[ list_jobs[k].sub_jobs[sub_k][option].machine_id ] += [(k, sub_k, option)] self.horizon = horizon if self.horizon is None: self.horizon = sum( sum( max(subjob.processing_time for subjob in subjob_opt) for subjob_opt in job.sub_jobs ) for job in self.list_jobs ) self.nb_subjob_per_job = { i: len(self.list_jobs[i].sub_jobs) for i in range(self.n_jobs) } self.subjob_possible_machines = { (i, j): set(x.machine_id for x in self.list_jobs[i].sub_jobs[j]) for i in range(self.n_jobs) for j in range(self.nb_subjob_per_job[i]) } self.duration_per_machines = { (i, j): { x.machine_id: x.processing_time for x in self.list_jobs[i].sub_jobs[j] } for (i, j) in self.subjob_possible_machines } self.mode2machine = { (j, k): { mode: subjob.machine_id for mode, subjob in enumerate(sub_job_options) } for j, job in enumerate(self.list_jobs) for k, sub_job_options in enumerate(job.sub_jobs) } self.machine2mode = { (j, k): { subjob.machine_id: mode for mode, subjob in enumerate(sub_job_options) } for j, job in enumerate(self.list_jobs) for k, sub_job_options in enumerate(job.sub_jobs) }
[docs] def get_makespan_upper_bound(self) -> int: return self.horizon
[docs] def get_task_mode_duration(self, task: Task, mode: int) -> int: return self.duration_per_machines[task][self.mode2machine[task][mode]]
@property def tasks_list(self) -> list[Task]: return [ (j, k) for j, job in enumerate(self.list_jobs) for k in range(len(job.sub_jobs)) ]
[docs] def get_precedence_constraints(self) -> dict[Task, list[Task]]: return { (j, k): [(j, k + 1)] if k + 1 < len(job.sub_jobs) else [] for j, job in enumerate(self.list_jobs) for k in range(len(job.sub_jobs)) }
[docs] def get_task_modes(self, task: Task) -> set[int]: j, k = task return set(range(len(self.list_jobs[j].sub_jobs[k])))
[docs] def get_last_tasks(self) -> list[Task]: return [(j, len(job.sub_jobs) - 1) for j, job in enumerate(self.list_jobs)]
[docs] def evaluate(self, variable: FJobShopSolution) -> dict[str, float]: return {"makespan": variable.get_max_end_time()}
[docs] def satisfy(self, variable: FJobShopSolution) -> bool: for task, machines in self.subjob_possible_machines.items(): if not variable.get_machine(task=task) in machines: logger.debug(f"Unallowed machine used for task {task}") return False for m in self.job_per_machines: sorted_ = sorted( [ variable.schedule[x[0]][x[1]] for x in self.job_per_machines[m] if variable.schedule[x[0]][x[1]][2] == m ], key=lambda y: y[0], ) len_ = len(sorted_) for i in range(1, len_): if sorted_[i][0] < sorted_[i - 1][1]: logger.debug(f"Overlapping task on machine {m}") return False # Check mapping mode <-> machine_id for task in self.tasks_list: if ( variable.get_machine(task) != self.mode2machine[task][variable.get_mode(task)] ): logger.debug( f"Machine choice and option choice does not match for task {task}." ) return False # Check tasks duration if not variable.check_duration_constraints(): return False # Check precedence constraints if not variable.check_precedence_constraints(): return False return True
[docs] def get_solution_type(self) -> type[Solution]: return FJobShopSolution
[docs] def get_objective_register(self) -> ObjectiveRegister: return ObjectiveRegister( dict_objective_to_doc={ "makespan": ObjectiveDoc(type=TypeObjective.OBJECTIVE, default_weight=1) }, objective_sense=ModeOptim.MINIMIZATION, objective_handling=ObjectiveHandling.AGGREGATE, )