Source code for discrete_optimization.flex_scheduling.problem

#  Copyright (c) 2026 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
from copy import deepcopy
from dataclasses import dataclass, field
from functools import cache
from typing import Dict, Hashable, List, Set, Tuple, Type

from discrete_optimization.generic_tasks_tools.allocation import (
    NoUnaryResource,
    WithoutAllocationProblem,
    WithoutAllocationSolution,
)
from discrete_optimization.generic_tasks_tools.calendar_resource import (
    convert_calendar_to_availability_intervals,
)
from discrete_optimization.generic_tasks_tools.enums import StartOrEnd
from discrete_optimization.generic_tasks_tools.generic_scheduling import (
    GenericSchedulingProblem,
    GenericSchedulingSolution,
)
from discrete_optimization.generic_tasks_tools.multimode import MultimodeSolution
from discrete_optimization.generic_tasks_tools.no_overlap_scheduling import (
    WithoutNoOverlapProblem,
)
from discrete_optimization.generic_tasks_tools.scheduling import SchedulingSolution
from discrete_optimization.generic_tasks_tools.skill import (
    NonSkillCumulativeResource,
    NoSkill,
    Skill,
    WithoutSkillProblem,
    WithoutSkillSolution,
)
from discrete_optimization.generic_tools.do_problem import *

RESOURCE_KEY = Hashable
TASK_KEY = Hashable
GROUP_KEY = Hashable
Resource = RESOURCE_KEY
CumulativeResource = RESOURCE_KEY
NonRenewableResource = RESOURCE_KEY
Task = TASK_KEY


[docs] @dataclass class TaskData: duration: int resource_consumption: Dict[RESOURCE_KEY, int] # Specify if the task can be paused during a resource break and resumed after, # this is a simplified preemption which is quite managable to deal with in mathematical models, in practice the task # will span over a longer duration than expected when there is a break inside. preemptive_on_resource_break: bool
[docs] def get_res_consumption(self, res: RESOURCE_KEY): return self.resource_consumption.get(res, 0)
[docs] @dataclass class TaskObject: id: TASK_KEY # more natural name if existing name: str # mode of execution of the task, it specifies different ways of doing the same task. modes: Dict[int, TaskData] # release date min_starting_date: Optional[int] = None # deadline on starting date max_starting_date: Optional[int] = None # deadline date max_ending_date: Optional[int] = None # "release" date of ending min_ending_date: Optional[int] = None # if max_ending_date should be considered really as hard. soft_max_end_date: bool = False # Price per unit time after finishing the task. Main parameter # to calculate WIP Objective as this price times the delta # of user end date and actual solution end date will be accounted # as wip. # To be improved: define this price per mode # Breakdown of the price: # - Initial price: Price if the task has been already started # - Delivery price: Fixed price to add at the end of the task # - Resource price: This price should be variable, taking into account # actual subintervals where the task runs. Simplified # as the total price for the resources price: int = 1 modes_id_to_index: dict[int, int] = field(init=False, default=None) def __post_init__(self): sorted_modes = sorted(list(self.modes.keys())) self.modes_id_to_index = {sorted_modes[i]: i for i in range(len(sorted_modes))}
[docs] class GroupType(Enum): SUBGROUP_TASK_FOR_OBJECTIVE = 0 GROUP_TASK_NON_RELEASED_RESOURCE = 1 TASK_RELEASE_MODE = 2 GROUP_GENERIC_RELEASE = 3
[docs] @dataclass class TasksGroups: # Typically a structure to group task of the same MSN aircraft, that could intervene in the objective functions ! id: GROUP_KEY name: str tasks_group: Set[TASK_KEY] # if by chance we know the first task of this set of tasks first_task_if_any: Optional[TASK_KEY] = None # if by chance we know the last task of this set of tasks last_task_if_any: Optional[TASK_KEY] = None type_of_group: GroupType = GroupType.SUBGROUP_TASK_FOR_OBJECTIVE # if the group is of time "GROUP_TASK_NON_RELEASED_RESOURCE", then the span of group of tasks # consumes a given quantity of resource in this dictionary ! res_not_released: Optional[Dict[RESOURCE_KEY, int]] = None # it the corresponding task should not overlap no_overlap: Optional[bool] = False # release date of the group min_starting_date: Optional[int] = None # "deadline" on starting date of the group max_starting_date: Optional[int] = None # deadline date of the task group max_ending_date: Optional[int] = None # "release" date of ending of the task group min_ending_date: Optional[int] = None # soft_max_end_date: bool = False
[docs] @dataclass class TaskGroupAbstraction: is_a_task: bool task_id: TASK_KEY group_id: GROUP_KEY
[docs] @dataclass class ResourceData: id: RESOURCE_KEY name: Optional[str] calendar_availability: np.ndarray renewable: bool max_capacity: int is_disjunctive: bool is_station: bool is_operator: bool child_resource: Set[RESOURCE_KEY] = None # field(init=False, default=None)
[docs] @dataclass class ConstraintsTask: # for each s in successors, for each succ in successors[s], start[succ] >= end[s] successors: Dict[TASK_KEY, Set[TASK_KEY]] # successors + non releasable resource relation : the resource of task is released at the start of the successor. # (t1, t2, {res: 1}) : res is consumed from end of task t1 to start of task t2 successor_with_res_release_at_start_of_successor: list[ Tuple[TASK_KEY, TASK_KEY, dict[RESOURCE_KEY, int]] ] = None # Initially, this could be # ((t1, mode1), t2, {res:1}) : if t1 is in mode1, then the res is consumed from end of t1 to start of task t2 successor_with_res_release_at_start_of_successor_mode: list[ Tuple[Tuple[TASK_KEY, int], TASK_KEY, dict[RESOURCE_KEY, int]] ] = None # object to handle resource blocked between end of task (or group of task) # and start of another task (or group of task) successor_generic_with_res_release_at_start_of_successor_generic: list[ Tuple[TaskGroupAbstraction, TaskGroupAbstraction, dict[RESOURCE_KEY, int]] ] = None # successors link between group of Tasks successors_group_tasks: Optional[Dict[GROUP_KEY, Set[GROUP_KEY]]] = None # grouped task sharing some same resource that should not be released on the all span of the group of task # The set of resource that is specified will not be released over the set of task. # grouped_tasks: Optional[List[Tuple[Set[TASK_KEY], Set[RESOURCE_KEY]]]] = None # start at start data start_at_start: Optional[List[Tuple[TASK_KEY, TASK_KEY]]] = None # start at end start_at_end: Optional[List[Tuple[TASK_KEY, TASK_KEY]]] = None # st[e[0]] >= end[e[1]]+e[2] start_after_end_plus_offset: Optional[List[Tuple[TASK_KEY, TASK_KEY, int]]] = None # st[e[0]] == end[e[1]]+e[2] start_at_end_plus_offset: Optional[List[Tuple[TASK_KEY, TASK_KEY, int]]] = None # st[e[0]] >= st[e[1]]+e[2] start_after_start_plus_offset: Optional[List[Tuple[TASK_KEY, TASK_KEY, int]]] = None # st[e[0]] == st[e[1]]+e[2] start_at_start_plus_offset: Optional[List[Tuple[TASK_KEY, TASK_KEY, int]]] = None
[docs] class ObjectivesEnum(Enum): MAKESPAN = 0 RESOURCE_COST = 1 WORK_IN_PROGRESS = 2 TARDINESS = 3 EARLINESS = 4 NON_RELEASE_DURATION = 5
[docs] @dataclass class ObjectiveParamWIP: # Weight for the optimization weight: int weight_per_task: Dict[TASK_KEY, float] weights_per_group_task: Dict[GROUP_KEY, float] count_nb_group_in_progress: bool = True coefficient_on_nb_group_in_progress: float = 0
[docs] @dataclass class ObjectiveParamResource: # here we want to minimize the resource capacity of some resources, # the unit cost of 1 capacity can be given with this dictionary weight_per_resource_unit: Dict[RESOURCE_KEY, float] # Here to discard or not some resource from our optimisation. # Typically some resources in the data like the station, # can be ignored from the optim, we have 1 and it's not flexible. consider_in_objectives: Dict[RESOURCE_KEY, bool] # Weight for the optimization weight: int
[docs] @dataclass class ObjectiveParamTardiness: """Specify for meaningful task of group task some weight to put on the tardiness cost""" weight_per_task: Dict[TASK_KEY, float] weight_per_groups: Dict[GROUP_KEY, float]
[docs] @dataclass class ObjectiveParamEarliness: """Specify for meaningful task of group task some weight to put on the tardiness cost""" weight_per_task: Dict[TASK_KEY, float] weight_per_groups: Dict[GROUP_KEY, float]
[docs] @dataclass class ObjectiveParams: params_obj: Dict[ ObjectivesEnum, Union[ ObjectiveParamWIP, ObjectiveParamResource, ObjectiveParamEarliness, ObjectiveParamTardiness, float, ], ]
[docs] class ScheduleSolution( GenericSchedulingSolution[ Task, NoUnaryResource, NoSkill, NonSkillCumulativeResource, NonRenewableResource ], WithoutSkillSolution[ Task, NoUnaryResource, NonSkillCumulativeResource, NoUnaryResource ], WithoutAllocationSolution[Task], ): problem: "FlexProblem" def __init__(self, problem: "FlexProblem", schedule: np.ndarray, modes: np.ndarray): super().__init__(problem) self.schedule = schedule self.modes = modes
[docs] def get_end_time(self, task: Task) -> int: index = self.problem.task_id_to_index[task] return int(self.schedule[index, 1])
[docs] def get_start_time(self, task: Task) -> int: index = self.problem.task_id_to_index[task] return int(self.schedule[index, 0])
[docs] def get_mode(self, task: Task) -> int: index = self.problem.task_id_to_index[task] return int(self.modes[index])
[docs] def copy(self) -> "Solution": return ScheduleSolution( problem=self.problem, schedule=np.copy(self.schedule), modes=np.copy(self.modes), )
[docs] class ScheduleSolutionPreemptive(SchedulingSolution[Task], MultimodeSolution[Task]): problem: "FlexProblem" def __init__( self, problem: "FlexProblem", schedule: list[list[tuple[int, int]]], modes: np.ndarray, ): super().__init__(problem) self.schedule = schedule self.modes = modes
[docs] def get_mode(self, task: Task) -> int: index = self.problem.task_id_to_index[task] return self.modes[index]
[docs] def get_end_time(self, task: Task) -> int: index = self.problem.task_id_to_index[task] return self.schedule[index][-1][1]
[docs] def get_start_time(self, task: Task) -> int: index = self.problem.task_id_to_index[task] return self.schedule[index][0][0]
[docs] def copy(self) -> "Solution": return ScheduleSolutionPreemptive( problem=self.problem, schedule=deepcopy(self.schedule), modes=np.copy(self.modes), )
[docs] class FlexProblem( GenericSchedulingProblem[ Task, NoUnaryResource, NoSkill, NonSkillCumulativeResource, NonRenewableResource ], WithoutNoOverlapProblem[Task], WithoutSkillProblem[ Task, NoUnaryResource, NonSkillCumulativeResource, NoUnaryResource ], WithoutAllocationProblem[Task], ): @property def non_skill_cumulative_resources_list(self) -> list[Skill]: return [resource.id for resource in self.resources if resource.renewable]
[docs] def get_cumulative_resource_consumption( self, resource: CumulativeResource, task: Task, mode: int ) -> int: if mode in self.get_task_modes(task): return ( self.task_id_dict[task] .modes[mode] .resource_consumption.get(resource, 0) ) return 0
[docs] @cache def get_resource_availabilities( self, resource: Resource ) -> list[tuple[int, int, int]]: return convert_calendar_to_availability_intervals( calendar=self.resource_dict[resource].calendar_availability, horizon=self.horizon, )
[docs] def get_task_mode_duration(self, task: Task, mode: int) -> int: if mode in self.get_task_modes(task): return self.task_id_dict[task].modes[mode].duration return 0
@property def non_renewable_resources_list(self) -> list[NonRenewableResource]: return [r.id for r in self.resources if not r.renewable]
[docs] def get_non_renewable_resource_capacity( self, resource: NonRenewableResource ) -> int: return self.resource_dict[resource].max_capacity
[docs] def get_non_renewable_resource_consumption( self, resource: NonRenewableResource, task: Task, mode: int ) -> int: if mode in self.get_task_modes(task): return ( self.task_id_dict[task] .modes[mode] .resource_consumption.get(resource, 0) ) return 0
[docs] def get_precedence_constraints(self) -> dict[Task, Iterable[Task]]: return self.constraints.successors
[docs] def get_makespan_upper_bound(self) -> int: return self.horizon
[docs] def get_task_start_or_end_lower_bound( self, task: Task, start_or_end: StartOrEnd ) -> int: if start_or_end == StartOrEnd.START: return self.min_start_time[self.task_id_to_index[task]] if start_or_end == StartOrEnd.END: return self.min_end_time[self.task_id_to_index[task]] return 0
[docs] def get_task_start_or_end_upper_bound( self, task: Task, start_or_end: StartOrEnd ) -> int: if start_or_end == StartOrEnd.START: return self.max_start_time[self.task_id_to_index[task]] if start_or_end == StartOrEnd.END: return self.max_end_time[self.task_id_to_index[task]] return self.get_makespan_upper_bound()
[docs] def get_task_modes(self, task: Task) -> set[int]: return set(self.task_id_dict[task].modes)
@property def tasks_list(self) -> list[Task]: return self.tasks_ids def __init__( self, resources: List[ResourceData], tasks: List[TaskObject], tasks_group: List[TasksGroups], constraints: ConstraintsTask, objective_params: ObjectiveParams, horizon: int, ): from .fsp_utils import ( compute_duration_tasks_function_time, get_lb_ub_start_end_date, ) self.resources = resources self.tasks = tasks self.tasks_group = tasks_group self.objective_params = objective_params self.horizon = horizon self.tasks_ids = [self.tasks[i].id for i in range(len(self.tasks))] assert len(set(self.tasks_ids)) == len(self.tasks) self.constraints = constraints self.nb_tasks = len(self.tasks) self.task_id_to_index = {self.tasks[i].id: i for i in range(self.nb_tasks)} self.index_to_task_id = {i: self.tasks[i].id for i in range(self.nb_tasks)} self.index_task_dict = { index: self.tasks[index] for index in self.index_to_task_id } self.task_id_dict: dict[TASK_KEY, TaskObject] = { self.tasks[index].id: self.tasks[index] for index in self.index_to_task_id } assert len( set([self.resources[i].id for i in range(len(self.resources))]) ) == len(resources) self.nb_resources = len(resources) self.resource_dict: Dict[RESOURCE_KEY, ResourceData] = { r.id: r for r in self.resources } self.resource_id_to_index = { self.resources[i].id: i for i in range(self.nb_resources) } self.group_id_to_index = { self.tasks_group[i].id: i for i in range(len(self.tasks_group)) } self.durations_data, self.res_arrays_data = ( compute_duration_tasks_function_time(self) ) ( self.min_start_time, self.max_start_time, self.min_end_time, self.max_end_time, ) = get_lb_ub_start_end_date(self)
[docs] def update_data_placeholders(self): """If data has been changed in place, we should change the other utils attributes""" from .fsp_utils import compute_duration_tasks_function_time self.tasks_ids = [self.tasks[i].id for i in range(len(self.tasks))] assert len(set(self.tasks_ids)) == len(self.tasks) self.nb_tasks = len(self.tasks) self.task_id_to_index = {self.tasks[i].id: i for i in range(self.nb_tasks)} self.index_to_task_id = {i: self.tasks[i].id for i in range(self.nb_tasks)} self.index_task_dict = { index: self.tasks[index] for index in self.index_to_task_id } self.task_id_dict: dict[TASK_KEY, TaskObject] = { self.tasks[index].id: self.tasks[index] for index in self.index_to_task_id } self.nb_resources = len(self.resources) self.resource_dict: Dict[RESOURCE_KEY, ResourceData] = { r.id: r for r in self.resources } self.resource_id_to_index = { self.resources[i].id: i for i in range(self.nb_resources) } self.group_id_to_index = { self.tasks_group[i].id: i for i in range(len(self.tasks_group)) } self.get_resource_availabilities.cache_clear() self.durations_data, self.res_arrays_data = ( compute_duration_tasks_function_time(self) ) ( self.min_start_time, self.max_start_time, self.min_end_time, self.max_end_time, ) = get_lb_ub_start_end_date(self)
@staticmethod def _compute_resource_usage_preemptive( problem: "FlexProblem", schedule: list[list[tuple[int, int]]], modes: np.ndarray, ) -> Dict[RESOURCE_KEY, np.ndarray]: """Compute resource usage over time for a preemptive schedule.""" resource_usage = { r.id: np.zeros(problem.horizon, dtype=int) for r in problem.resources } for task_idx in range(problem.nb_tasks): mode = int(modes[task_idx]) task_data = problem.tasks[task_idx].modes[mode] # For each interval in the preemptive schedule for start, end in schedule[task_idx]: for t in range(start, end): for res_id, consumption in task_data.resource_consumption.items(): resource_usage[res_id][t] += consumption return resource_usage
[docs] def evaluate(self, variable: ScheduleSolution) -> Dict[str, Any]: """ Evaluate the solution to compute KPIs matching the CP solver objectives. Returns a dictionary containing: - makespan - resource_consumption - tardiness (if applicable) - earliness (if applicable) - resource_cost (if applicable) - wip_cost (if applicable) """ from .fsp_utils import SolutionDetails if isinstance(variable, ScheduleSolutionPreemptive): makespan = max( [variable.schedule[i][-1][1] for i in range(len(variable.schedule))] ) resource_consumption = self._compute_resource_usage_preemptive( self, variable.schedule, variable.modes ) # Initialize all objectives evaluation = { "makespan": makespan, "tardiness": 0.0, "earliness": 0.0, "resource_cost": 0.0, "wip_cost": 0.0, } # --- Tardiness --- if ObjectivesEnum.TARDINESS in self.objective_params.params_obj: obj_param = self.objective_params.params_obj[ObjectivesEnum.TARDINESS] tardiness = 0.0 for t_id, weight in obj_param.weight_per_task.items(): if weight > 0: idx = self.task_id_to_index[t_id] deadline = self.tasks[idx].max_ending_date if deadline is not None: end_time = variable.schedule[idx][-1][1] tardiness += max(0, end_time - deadline) * weight for g_id, weight in obj_param.weight_per_groups.items(): if weight > 0: g_idx = self.group_id_to_index[g_id] group = self.tasks_group[g_idx] deadline = group.max_ending_date if deadline is not None: task_indices = [ self.task_id_to_index[t] for t in group.tasks_group ] group_end = max( variable.schedule[idx][-1][1] for idx in task_indices ) tardiness += max(0, group_end - deadline) * weight evaluation["tardiness"] = tardiness # --- Earliness --- if ObjectivesEnum.EARLINESS in self.objective_params.params_obj: obj_param = self.objective_params.params_obj[ObjectivesEnum.EARLINESS] earliness = 0.0 for t_id, weight in obj_param.weight_per_task.items(): if weight > 0: idx = self.task_id_to_index[t_id] deadline = self.tasks[idx].max_ending_date if deadline is not None: end_time = variable.schedule[idx][-1][1] earliness += max(0, deadline - end_time) * weight for g_id, weight in obj_param.weight_per_groups.items(): if weight > 0: g_idx = self.group_id_to_index[g_id] group = self.tasks_group[g_idx] deadline = group.max_ending_date if deadline is not None: task_indices = [ self.task_id_to_index[t] for t in group.tasks_group ] group_end = max( variable.schedule[idx][-1][1] for idx in task_indices ) earliness += max(0, deadline - group_end) * weight evaluation["earliness"] = earliness # --- Resource Cost --- if ObjectivesEnum.RESOURCE_COST in self.objective_params.params_obj: obj_param = self.objective_params.params_obj[ ObjectivesEnum.RESOURCE_COST ] cost = 0.0 for res_id, weight in obj_param.weight_per_resource_unit.items(): if weight == 0: continue if ( res_id in obj_param.consider_in_objectives and not obj_param.consider_in_objectives[res_id] ): continue max_usage = np.max(resource_consumption[res_id]) cost += max_usage * weight evaluation["resource_cost"] = cost # --- WIP Cost --- # WIP (Work In Progress) = maximum number of concurrent groups in progress if ObjectivesEnum.WORK_IN_PROGRESS in self.objective_params.params_obj: obj_param = self.objective_params.params_obj[ ObjectivesEnum.WORK_IN_PROGRESS ] wip_cost = 0.0 if ( obj_param.count_nb_group_in_progress and obj_param.coefficient_on_nb_group_in_progress != 0 ): relevant_groups = [ g for g in self.tasks_group if g.type_of_group == GroupType.SUBGROUP_TASK_FOR_OBJECTIVE ] if relevant_groups: events = [] for g in relevant_groups: t_indices = [ self.task_id_to_index[t] for t in g.tasks_group ] g_start = min( variable.schedule[idx][0][0] for idx in t_indices ) g_end = max( variable.schedule[idx][-1][1] for idx in t_indices ) events.append((g_start, 1)) events.append((g_end, -1)) events.sort(key=lambda x: (x[0], x[1])) max_concurrent_groups = 0 current_concurrent = 0 for _, change in events: current_concurrent += change max_concurrent_groups = max( max_concurrent_groups, current_concurrent ) wip_cost = ( max_concurrent_groups * obj_param.coefficient_on_nb_group_in_progress ) evaluation["wip_cost"] = wip_cost return evaluation # 1. Base Metrics makespan = np.max(variable.schedule[:, 1]) sd = SolutionDetails( problem=self, solution=variable, durations_data=self.durations_data, res_arrays_data=self.res_arrays_data, ) resource_consumption = sd.resource_usage sd.compute_details() # Initialize all objectives declared in get_objective_register() evaluation = { "makespan": makespan, "tardiness": 0.0, "earliness": 0.0, "resource_cost": 0.0, "wip_cost": 0.0, } # 2. Objective-based Metrics # We iterate through the objective parameters to calculate specific costs # matching the logic in the CP solvers (e.g., fsp_cpsat_solver.py) # --- Tardiness --- if ObjectivesEnum.TARDINESS in self.objective_params.params_obj: obj_param = self.objective_params.params_obj[ObjectivesEnum.TARDINESS] tardiness = 0.0 # Task Tardiness: sum(max(0, end - deadline) * weight) for t_id, weight in obj_param.weight_per_task.items(): if weight > 0: idx = self.task_id_to_index[t_id] deadline = self.tasks[idx].max_ending_date if deadline is not None: end_time = variable.schedule[idx, 1] tardiness += max(0, end_time - deadline) * weight # Group Tardiness: sum(max(0, group_end - deadline) * weight) for g_id, weight in obj_param.weight_per_groups.items(): if weight > 0: g_idx = self.group_id_to_index[g_id] group = self.tasks_group[g_idx] deadline = group.max_ending_date if deadline is not None: # Group end is the max end time of all tasks in the group task_indices = [ self.task_id_to_index[t] for t in group.tasks_group ] group_end = np.max(variable.schedule[task_indices, 1]) tardiness += max(0, group_end - deadline) * weight evaluation["tardiness"] = tardiness # --- Earliness --- if ObjectivesEnum.EARLINESS in self.objective_params.params_obj: obj_param = self.objective_params.params_obj[ObjectivesEnum.EARLINESS] earliness = 0.0 # Task Earliness: sum(max(0, deadline - end) * weight) for t_id, weight in obj_param.weight_per_task.items(): if weight > 0: idx = self.task_id_to_index[t_id] deadline = self.tasks[idx].max_ending_date if deadline is not None: end_time = variable.schedule[idx, 1] earliness += max(0, deadline - end_time) * weight # Group Earliness for g_id, weight in obj_param.weight_per_groups.items(): if weight > 0: g_idx = self.group_id_to_index[g_id] group = self.tasks_group[g_idx] deadline = group.max_ending_date if deadline is not None: task_indices = [ self.task_id_to_index[t] for t in group.tasks_group ] group_end = np.max(variable.schedule[task_indices, 1]) earliness += max(0, deadline - group_end) * weight evaluation["earliness"] = earliness # --- Resource Cost --- if ObjectivesEnum.RESOURCE_COST in self.objective_params.params_obj: obj_param = self.objective_params.params_obj[ObjectivesEnum.RESOURCE_COST] cost = 0.0 for res_id, weight in obj_param.weight_per_resource_unit.items(): if weight == 0: continue # Skip if resource is explicitly excluded from objectives if ( res_id in obj_param.consider_in_objectives and not obj_param.consider_in_objectives[res_id] ): continue max_usage = np.max(resource_consumption[res_id]) cost += max_usage * weight evaluation["resource_cost"] = cost # --- WIP Cost --- # WIP (Work In Progress) = maximum number of concurrent groups in progress if ObjectivesEnum.WORK_IN_PROGRESS in self.objective_params.params_obj: obj_param = self.objective_params.params_obj[ ObjectivesEnum.WORK_IN_PROGRESS ] wip_cost = 0.0 if ( obj_param.count_nb_group_in_progress and obj_param.coefficient_on_nb_group_in_progress != 0 ): relevant_groups = [ g for g in self.tasks_group if g.type_of_group == GroupType.SUBGROUP_TASK_FOR_OBJECTIVE ] if relevant_groups: # Collect start/end events for a sweep-line algorithm events = [] for g in relevant_groups: t_indices = [self.task_id_to_index[t] for t in g.tasks_group] # Group spans from min start of its tasks to max end of its tasks g_start = np.min(variable.schedule[t_indices, 0]) g_end = np.max(variable.schedule[t_indices, 1]) # Add events: +1 at start, -1 at end events.append((g_start, 1)) events.append((g_end, -1)) # Sort events by time. # If times are equal, process -1 (end) before +1 (start) to treat intervals as [s, e) # This prevents false overlaps at boundaries. events.sort(key=lambda x: (x[0], x[1])) max_concurrent_groups = 0 current_concurrent = 0 for _, change in events: current_concurrent += change max_concurrent_groups = max( max_concurrent_groups, current_concurrent ) wip_cost = ( max_concurrent_groups * obj_param.coefficient_on_nb_group_in_progress ) evaluation["wip_cost"] = wip_cost return evaluation
[docs] def satisfy(self, variable: ScheduleSolution) -> bool: from .fsp_utils import SolutionDetails # TODO: implement proper satisfaction checking for preemptive schedules # Should check: # - Task durations (sum of intervals = required duration) # - Resource constraints (preemptive intervals respect calendars) # - Precedence constraints # - Group constraints # - Time windows (min/max start/end dates) if isinstance(variable, ScheduleSolutionPreemptive): return True l_violation = [] sd = SolutionDetails( problem=self, solution=variable, durations_data=self.durations_data, res_arrays_data=self.res_arrays_data, ) sd.compute_details() sat = sd.satisfy() if not sat: return sat # l_violation += satisfy_task_duration(variable, usage, self) # covered by the solution details l_violation += satisfy_precedence(variable, self) l_violation += satisfy_precedence_group(variable, self) l_violation += satisfy_release_and_date(variable, self) l_violation += satisfy_generic_precedence_constraint(variable, self) return len(l_violation) == 0
[docs] def get_attribute_register(self) -> EncodingRegister: return EncodingRegister({})
[docs] def get_solution_type(self) -> Type[Solution]: return ScheduleSolution
[docs] def get_objective_register(self) -> ObjectiveRegister: return ObjectiveRegister( objective_sense=ModeOptim.MINIMIZATION, objective_handling=ObjectiveHandling.AGGREGATE, dict_objective_to_doc={ "makespan": ObjectiveDoc( type=TypeObjective.OBJECTIVE, default_weight=1 ), "tardiness": ObjectiveDoc( type=TypeObjective.OBJECTIVE, default_weight=1 ), "earliness": ObjectiveDoc( type=TypeObjective.OBJECTIVE, default_weight=1 ), "resource_cost": ObjectiveDoc( type=TypeObjective.OBJECTIVE, default_weight=1 ), "wip_cost": ObjectiveDoc( type=TypeObjective.OBJECTIVE, default_weight=1 ), }, )
def __str__(self): s = " ".join( [str(getattr(self, attr)) for attr in ["tasks", "tasks_group", "resources"]] ) return s
[docs] def satisfy_release_and_date(solution: ScheduleSolution, problem: FlexProblem): l_violation = [] for task in problem.tasks: index = problem.task_id_to_index[task.id] if task.min_starting_date is not None: if solution.schedule[index, 0] < task.min_starting_date: l_violation.append( ( "start_before_min_starting_date", task, solution.schedule[index, 0], task.min_starting_date, ) ) if task.max_starting_date is not None: if solution.schedule[index, 0] > task.max_starting_date: l_violation.append( ( "start_after_max_starting_date", task, solution.schedule[index, 0], task.max_starting_date, ) ) if task.min_ending_date is not None: if ( solution.schedule[index, 1] < task.min_ending_date and not task.soft_max_end_date ): l_violation.append( ( "end_before_min_ending_date", task, solution.schedule[index, 1], task.min_ending_date, ) ) if task.max_ending_date is not None: if ( solution.schedule[index, 1] > task.max_ending_date and not task.soft_max_end_date ): l_violation.append( ( "end_after_max_ending_date", task, solution.schedule[index, 1], task.max_ending_date, ) ) return l_violation
[docs] def satisfy_task_duration(solution, usage, problem): l_violation = [] for task in problem.tasks: index = problem.task_id_to_index[task.id] mode = solution.modes[index] if task.modes[mode].duration != usage[index].sum(): l_violation.append( ("task_duration", task, task.modes[mode].duration, usage[index].sum()) ) return l_violation
[docs] def satisfy_precedence(solution: ScheduleSolution, problem: FlexProblem): l_violation = [] if problem.constraints.successors is not None: for task_id in problem.constraints.successors: index = problem.task_id_to_index[task_id] for succ_id in problem.constraints.successors[task_id]: succ_index = problem.task_id_to_index[succ_id] if solution.schedule[succ_index, 0] < solution.schedule[index, 1]: l_violation.append( ( "successor_violated", (task_id, succ_id), ( solution.schedule[index, 1], solution.schedule[succ_index, 0], ), ) ) return l_violation
[docs] def satisfy_precedence_group(solution: ScheduleSolution, problem: FlexProblem): l_violation = [] if problem.constraints.successors_group_tasks is not None: for group_id in problem.constraints.successors_group_tasks: gr = problem.tasks_group[problem.group_id_to_index[group_id]] tasks_index_in_group = [problem.task_id_to_index[x] for x in gr.tasks_group] max_end_group = max([solution.schedule[x, 1] for x in tasks_index_in_group]) for succ_group in problem.constraints.successors_group_tasks[group_id]: gr_succ = problem.tasks_group[problem.group_id_to_index[succ_group]] tasks_index_in_group_succ = [ problem.task_id_to_index[x] for x in gr_succ.tasks_group ] min_start_group = max( [solution.schedule[x, 0] for x in tasks_index_in_group_succ] ) if min_start_group < max_end_group: l_violation.append( ( "successor_group_violated", (group_id, succ_group), (max_end_group, min_start_group), ) ) return l_violation
[docs] def satisfy_generic_precedence_constraint( solution: ScheduleSolution, problem: FlexProblem ): l_violation = [] if problem.constraints.start_at_end is not None: for t1, t2 in problem.constraints.start_at_end: i1, i2 = problem.task_id_to_index[t1], problem.task_id_to_index[t2] if solution.schedule[i1, 0] != solution.schedule[i2, 0]: l_violation.append( ( "start_at_start_violated", (t1, t2), (solution.schedule[i1, 0], solution.schedule[i2, 0]), ) ) if problem.constraints.start_at_end is not None: for t1, t2 in problem.constraints.start_at_end: i1, i2 = problem.task_id_to_index[t1], problem.task_id_to_index[t2] if solution.schedule[i1, 0] != solution.schedule[i2, 1]: l_violation.append( ( "start_at_end_violated", (t1, t2), (solution.schedule[i1, 0], solution.schedule[i2, 1]), ) ) if problem.constraints.start_after_end_plus_offset is not None: for t1, t2, delta in problem.constraints.start_after_end_plus_offset: i1, i2 = problem.task_id_to_index[t1], problem.task_id_to_index[t2] if solution.schedule[i1, 0] < solution.schedule[i2, 1] + delta: l_violation.append( ( "start_after_end_plus_offset_violated", (t1, t2, delta), (solution.schedule[i1, 0], solution.schedule[i2, 1]), ) ) if problem.constraints.start_at_end_plus_offset is not None: for t1, t2, delta in problem.constraints.start_at_end_plus_offset: i1, i2 = problem.task_id_to_index[t1], problem.task_id_to_index[t2] if solution.schedule[i1, 0] != solution.schedule[i2, 1] + delta: l_violation.append( ( "start_at_end_plus_offset_violated", (t1, t2, delta), (solution.schedule[i1, 0], solution.schedule[i2, 1]), ) ) if problem.constraints.start_after_start_plus_offset is not None: for t1, t2, delta in problem.constraints.start_after_start_plus_offset: i1, i2 = problem.task_id_to_index[t1], problem.task_id_to_index[t2] if solution.schedule[i1, 0] < solution.schedule[i2, 0] + delta: l_violation.append( ( "start_after_start_plus_offset_violated", (t1, t2, delta), (solution.schedule[i1, 0], solution.schedule[i2, 0]), ) ) if problem.constraints.start_at_start_plus_offset is not None: for t1, t2, delta in problem.constraints.start_at_start_plus_offset: i1, i2 = problem.task_id_to_index[t1], problem.task_id_to_index[t2] if solution.schedule[i1, 0] != solution.schedule[i2, 0] + delta: l_violation.append( ( "start_at_start_plus_offset_violated", (t1, t2, delta), (solution.schedule[i1, 0], solution.schedule[i2, 0]), ) ) return l_violation
[docs] def get_lb_ub_start_end_date(problem: FlexProblem): min_start_time = {i: 0 for i in range(problem.nb_tasks)} max_start_time = {i: problem.horizon for i in range(problem.nb_tasks)} min_end_time = {i: 0 for i in range(problem.nb_tasks)} max_end_time = {i: problem.horizon for i in range(problem.nb_tasks)} for i in range(problem.nb_tasks): task = problem.tasks[i] if task.min_starting_date is not None: min_start_time[i] = task.min_starting_date if task.max_starting_date is not None: max_start_time[i] = task.max_starting_date if task.min_ending_date is not None: min_end_time[i] = task.min_ending_date if task.max_ending_date is not None: max_end_time[i] = task.max_ending_date return min_start_time, max_start_time, min_end_time, max_end_time