discrete_optimization.flex_scheduling package

Subpackages

Submodules

discrete_optimization.flex_scheduling.fsp_utils module

class discrete_optimization.flex_scheduling.fsp_utils.SolutionDetails(problem: FlexProblem, solution: ScheduleSolution, durations_data: dict = None, res_arrays_data: dict = None)[source]

Bases: object

build_scheduling_preemptive() ScheduleSolutionPreemptive[source]
compute_details()[source]
group_consumption()[source]
resource_blocked_1()[source]
resource_blocked_gen()[source]
satisfy()[source]
discrete_optimization.flex_scheduling.fsp_utils.build_pattern_resource(resource_data: ResourceData)[source]
discrete_optimization.flex_scheduling.fsp_utils.compute_duration_function_time_cluster(orig_duration: int, resource_calendar: ndarray, cumulative_resource_calendar: ndarray)[source]
discrete_optimization.flex_scheduling.fsp_utils.compute_duration_tasks_function_time(problem: FlexProblem)[source]
discrete_optimization.flex_scheduling.fsp_utils.create_resource_consumption_from_calendar(calendar_availability: ndarray) List[Dict[str, int]][source]
discrete_optimization.flex_scheduling.fsp_utils.from_fsp_to_rcpsp(problem: FlexProblem) RcpspProblem[source]
discrete_optimization.flex_scheduling.fsp_utils.from_rcpsp_to_fsp(problem: RcpspProblem) FlexProblem[source]
discrete_optimization.flex_scheduling.fsp_utils.get_lb_ub_start_end_date(problem: FlexProblem)[source]
discrete_optimization.flex_scheduling.fsp_utils.get_lb_ub_start_end_date_group_of_task(problem: FlexProblem)[source]
discrete_optimization.flex_scheduling.fsp_utils.resource_consumption_modes(flex_problem: FlexProblem)[source]
discrete_optimization.flex_scheduling.fsp_utils.transform_problem_into_multimode(fsp: FlexProblem) FlexProblem[source]

discrete_optimization.flex_scheduling.generator module

class discrete_optimization.flex_scheduling.generator.FlexProblemGenerator(nb_msn: int = 36, horizon: int | None = None, seed: int = 42, tardiness_weight: int = 20, earliness_weight: int = 1, tightness_factor: float = 1.3, nb_tools: int = 12, nb_stations: int = 32)[source]

Bases: object

generate() FlexProblem[source]

discrete_optimization.flex_scheduling.problem module

class discrete_optimization.flex_scheduling.problem.ConstraintsTask(successors: Dict[Hashable, Set[Hashable]], successor_with_res_release_at_start_of_successor: list[Tuple[Hashable, Hashable, dict[Hashable, int]]] = None, successor_with_res_release_at_start_of_successor_mode: list[Tuple[Tuple[Hashable, int], Hashable, dict[Hashable, int]]] = None, successor_generic_with_res_release_at_start_of_successor_generic: list[Tuple[discrete_optimization.flex_scheduling.problem.TaskGroupAbstraction, discrete_optimization.flex_scheduling.problem.TaskGroupAbstraction, dict[Hashable, int]]] = None, successors_group_tasks: Dict[Hashable, Set[Hashable]] | None = None, start_at_start: List[Tuple[Hashable, Hashable]] | None = None, start_at_end: List[Tuple[Hashable, Hashable]] | None = None, start_after_end_plus_offset: List[Tuple[Hashable, Hashable, int]] | None = None, start_at_end_plus_offset: List[Tuple[Hashable, Hashable, int]] | None = None, start_after_start_plus_offset: List[Tuple[Hashable, Hashable, int]] | None = None, start_at_start_plus_offset: List[Tuple[Hashable, Hashable, int]] | None = None)[source]

Bases: object

start_after_end_plus_offset: List[Tuple[Hashable, Hashable, int]] | None = None
start_after_start_plus_offset: List[Tuple[Hashable, Hashable, int]] | None = None
start_at_end: List[Tuple[Hashable, Hashable]] | None = None
start_at_end_plus_offset: List[Tuple[Hashable, Hashable, int]] | None = None
start_at_start: List[Tuple[Hashable, Hashable]] | None = None
start_at_start_plus_offset: List[Tuple[Hashable, Hashable, int]] | None = None
successor_generic_with_res_release_at_start_of_successor_generic: list[Tuple[TaskGroupAbstraction, TaskGroupAbstraction, dict[Hashable, int]]] = None
successor_with_res_release_at_start_of_successor: list[Tuple[Hashable, Hashable, dict[Hashable, int]]] = None
successor_with_res_release_at_start_of_successor_mode: list[Tuple[Tuple[Hashable, int], Hashable, dict[Hashable, int]]] = None
successors: Dict[Hashable, Set[Hashable]]
successors_group_tasks: Dict[Hashable, Set[Hashable]] | None = None
class discrete_optimization.flex_scheduling.problem.FlexProblem(resources: List[ResourceData], tasks: List[TaskObject], tasks_group: List[TasksGroups], constraints: ConstraintsTask, objective_params: ObjectiveParams, horizon: int)[source]

Bases: GenericSchedulingProblem[Hashable, None, None, NonSkillCumulativeResource, Hashable], WithoutNoOverlapProblem[Hashable], WithoutSkillProblem[Hashable, None, NonSkillCumulativeResource, None], WithoutAllocationProblem[Hashable]

evaluate(variable: ScheduleSolution) Dict[str, Any][source]

Evaluate the solution to compute KPIs matching the CP solver objectives. Returns a dictionary containing: - makespan - resource_consumption - tardiness (if applicable) - earliness (if applicable) - resource_cost (if applicable) - wip_cost (if applicable)

get_attribute_register() EncodingRegister[source]

Returns how the Solution should be encoded.

Useful to find automatically available mutations for local search. Used by genetic algorithms Ga and Nsga.

This needs only to be implemented in child classes when GA or LS solvers are to be used.

Returns (EncodingRegister): content of the encoding of the solution

get_cumulative_resource_consumption(resource: Hashable, task: Hashable, mode: int) int[source]

Get cumulative resource consumption of the task in the given mode

Parameters:
  • resource – cumulative resource

  • task

  • mode – not used for single mode problems

Returns:

the consumption for cumulative resources.

get_makespan_upper_bound() int[source]

Get an upper bound on global makespan.

get_non_renewable_resource_capacity(resource: Hashable) int[source]

Get resource max capacity

Parameters:

resource

Returns:

get_non_renewable_resource_consumption(resource: Hashable, task: Hashable, mode: int) int[source]

Get resource consumption of the task in the given mode

Parameters:
  • resource – non-renewable resource

  • task

  • mode – not used for single mode problems

Returns:.

Raises:

ValueError – if resource consumption is depending on other variables than mode

get_objective_register() ObjectiveRegister[source]

Returns the objective definition.

Returns (ObjectiveRegister): object defining the objective criteria.

get_precedence_constraints() dict[Hashable, Iterable[Hashable]][source]

Map each task to the tasks that need to be performed after it.

get_resource_availabilities(resource: Hashable) list[tuple[int, int, int]][source]

Get availabilities intervals for a given resource

List of availability intervals of a resource. If the resource is not available, potentially no interval returned.

It is assumed that the intervals are disjunct though.

Parameters:

resource

Returns:

list of intervals of the form (start, end, value), which means from time start to time end, there are value of the resource available. NB: the start is included, the end is excluded (start <= t < end)

get_solution_type() Type[Solution][source]

Returns the class implementation of a Solution.

Returns (class): class object of the given Problem.

get_task_mode_duration(task: Hashable, mode: int) int[source]

Get task duration according to mode.

Parameters:
  • task

  • mode – not used for single-mode problems

Returns:

get_task_modes(task: Hashable) set[int][source]

Retrieve mode found for given task.

Parameters:

task

Returns:

get_task_start_or_end_lower_bound(task: Hashable, start_or_end: StartOrEnd) int[source]

Get a lower bound on start or end of a given task as specified by the problem.

For tighter computed bounds, see GenericSchedulingProblem.get_tight_task_start_or_end_lower_bound() and GenericSchedulingProblem.compute_task_bounds().

Default implementation: 0

Parameters:
  • task

  • start_or_end

Returns:

get_task_start_or_end_upper_bound(task: Hashable, start_or_end: StartOrEnd) int[source]

Get an upper bound on start or end of a given task as specified by the problem.

For tighter computed bounds, see GenericSchedulingProblem.get_tight_task_start_or_end_upper_bound() and GenericSchedulingProblem.compute_task_bounds().

Default implementation: makespan upper bound

Parameters:
  • task

  • start_or_end

Returns:

property non_renewable_resources_list: list[Hashable]

Non-renewable resources used by the tasks.

property non_skill_cumulative_resources_list: list[Skill]

List of cumulative resources that are not skills.

resource_dict: Dict[Hashable, ResourceData]
satisfy(variable: ScheduleSolution) bool[source]

Computes if a solution satisfies or not the constraints of the problem.

Parameters:

variable – the Solution object to check satisfability

Returns (bool): boolean true if the constraints are fulfilled, false elsewhere.

task_id_dict: dict[Hashable, TaskObject]
property tasks_list: list[Hashable]

List of all tasks to schedule or allocate to.

update_data_placeholders()[source]

If data has been changed in place, we should change the other utils attributes

class discrete_optimization.flex_scheduling.problem.GroupType(*values)[source]

Bases: Enum

GROUP_GENERIC_RELEASE = 3
GROUP_TASK_NON_RELEASED_RESOURCE = 1
SUBGROUP_TASK_FOR_OBJECTIVE = 0
TASK_RELEASE_MODE = 2
class discrete_optimization.flex_scheduling.problem.ObjectiveParamEarliness(weight_per_task: Dict[Hashable, float], weight_per_groups: Dict[Hashable, float])[source]

Bases: object

Specify for meaningful task of group task some weight to put on the tardiness cost

weight_per_groups: Dict[Hashable, float]
weight_per_task: Dict[Hashable, float]
class discrete_optimization.flex_scheduling.problem.ObjectiveParamResource(weight_per_resource_unit: Dict[Hashable, float], consider_in_objectives: Dict[Hashable, bool], weight: int)[source]

Bases: object

consider_in_objectives: Dict[Hashable, bool]
weight: int
weight_per_resource_unit: Dict[Hashable, float]
class discrete_optimization.flex_scheduling.problem.ObjectiveParamTardiness(weight_per_task: Dict[Hashable, float], weight_per_groups: Dict[Hashable, float])[source]

Bases: object

Specify for meaningful task of group task some weight to put on the tardiness cost

weight_per_groups: Dict[Hashable, float]
weight_per_task: Dict[Hashable, float]
class discrete_optimization.flex_scheduling.problem.ObjectiveParamWIP(weight: int, weight_per_task: Dict[Hashable, float], weights_per_group_task: Dict[Hashable, float], count_nb_group_in_progress: bool = True, coefficient_on_nb_group_in_progress: float = 0)[source]

Bases: object

coefficient_on_nb_group_in_progress: float = 0
count_nb_group_in_progress: bool = True
weight: int
weight_per_task: Dict[Hashable, float]
weights_per_group_task: Dict[Hashable, float]
class discrete_optimization.flex_scheduling.problem.ObjectiveParams(params_obj: Dict[discrete_optimization.flex_scheduling.problem.ObjectivesEnum, discrete_optimization.flex_scheduling.problem.ObjectiveParamWIP | discrete_optimization.flex_scheduling.problem.ObjectiveParamResource | discrete_optimization.flex_scheduling.problem.ObjectiveParamEarliness | discrete_optimization.flex_scheduling.problem.ObjectiveParamTardiness | float])[source]

Bases: object

params_obj: Dict[ObjectivesEnum, ObjectiveParamWIP | ObjectiveParamResource | ObjectiveParamEarliness | ObjectiveParamTardiness | float]
class discrete_optimization.flex_scheduling.problem.ObjectivesEnum(*values)[source]

Bases: Enum

EARLINESS = 4
MAKESPAN = 0
NON_RELEASE_DURATION = 5
RESOURCE_COST = 1
TARDINESS = 3
WORK_IN_PROGRESS = 2
class discrete_optimization.flex_scheduling.problem.ResourceData(id: Hashable, name: str | None, calendar_availability: numpy.ndarray, renewable: bool, max_capacity: int, is_disjunctive: bool, is_station: bool, is_operator: bool, child_resource: Set[Hashable] = None)[source]

Bases: object

calendar_availability: ndarray
child_resource: Set[Hashable] = None
id: Hashable
is_disjunctive: bool
is_operator: bool
is_station: bool
max_capacity: int
name: str | None
renewable: bool
class discrete_optimization.flex_scheduling.problem.ScheduleSolution(problem: FlexProblem, schedule: ndarray, modes: ndarray)[source]

Bases: GenericSchedulingSolution[Hashable, None, None, NonSkillCumulativeResource, Hashable], WithoutSkillSolution[Hashable, None, NonSkillCumulativeResource, None], WithoutAllocationSolution[Hashable]

copy() Solution[source]

Deep copy of the solution.

The copy() function should return a new object containing the same input as the current object, that respects the following expected behaviour: -y = x.copy() -if do some inplace change of y, the changes are not done in x.

Returns: a new object from which you can manipulate attributes without changing the original object.

get_end_time(task: Hashable) int[source]
get_mode(task: Hashable) int[source]

Retrieve mode found for given task.

Parameters:

task

Returns:

get_start_time(task: Hashable) int[source]
problem: FlexProblem
class discrete_optimization.flex_scheduling.problem.ScheduleSolutionPreemptive(problem: FlexProblem, schedule: list[list[tuple[int, int]]], modes: ndarray)[source]

Bases: SchedulingSolution[Hashable], MultimodeSolution[Hashable]

copy() Solution[source]

Deep copy of the solution.

The copy() function should return a new object containing the same input as the current object, that respects the following expected behaviour: -y = x.copy() -if do some inplace change of y, the changes are not done in x.

Returns: a new object from which you can manipulate attributes without changing the original object.

get_end_time(task: Hashable) int[source]
get_mode(task: Hashable) int[source]

Retrieve mode found for given task.

Parameters:

task

Returns:

get_start_time(task: Hashable) int[source]
problem: FlexProblem
class discrete_optimization.flex_scheduling.problem.TaskData(duration: int, resource_consumption: Dict[Hashable, int], preemptive_on_resource_break: bool)[source]

Bases: object

duration: int
get_res_consumption(res: Hashable)[source]
preemptive_on_resource_break: bool
resource_consumption: Dict[Hashable, int]
class discrete_optimization.flex_scheduling.problem.TaskGroupAbstraction(is_a_task: bool, task_id: Hashable, group_id: Hashable)[source]

Bases: object

group_id: Hashable
is_a_task: bool
task_id: Hashable
class discrete_optimization.flex_scheduling.problem.TaskObject(id: Hashable, name: str, modes: Dict[int, discrete_optimization.flex_scheduling.problem.TaskData], min_starting_date: int | None = None, max_starting_date: int | None = None, max_ending_date: int | None = None, min_ending_date: int | None = None, soft_max_end_date: bool = False, price: int = 1)[source]

Bases: object

id: Hashable
max_ending_date: int | None = None
max_starting_date: int | None = None
min_ending_date: int | None = None
min_starting_date: int | None = None
modes: Dict[int, TaskData]
modes_id_to_index: dict[int, int] = None
name: str
price: int = 1
soft_max_end_date: bool = False
class discrete_optimization.flex_scheduling.problem.TasksGroups(id: Hashable, name: str, tasks_group: Set[Hashable], first_task_if_any: Optional[Hashable] = None, last_task_if_any: Optional[Hashable] = None, type_of_group: discrete_optimization.flex_scheduling.problem.GroupType = <GroupType.SUBGROUP_TASK_FOR_OBJECTIVE: 0>, res_not_released: Optional[Dict[Hashable, int]] = None, no_overlap: Optional[bool] = False, min_starting_date: Optional[int] = None, max_starting_date: Optional[int] = None, max_ending_date: Optional[int] = None, min_ending_date: Optional[int] = None, soft_max_end_date: bool = False)[source]

Bases: object

first_task_if_any: Hashable | None = None
id: Hashable
last_task_if_any: Hashable | None = None
max_ending_date: int | None = None
max_starting_date: int | None = None
min_ending_date: int | None = None
min_starting_date: int | None = None
name: str
no_overlap: bool | None = False
res_not_released: Dict[Hashable, int] | None = None
soft_max_end_date: bool = False
tasks_group: Set[Hashable]
type_of_group: GroupType = 0
discrete_optimization.flex_scheduling.problem.get_lb_ub_start_end_date(problem: FlexProblem)[source]
discrete_optimization.flex_scheduling.problem.satisfy_generic_precedence_constraint(solution: ScheduleSolution, problem: FlexProblem)[source]
discrete_optimization.flex_scheduling.problem.satisfy_precedence(solution: ScheduleSolution, problem: FlexProblem)[source]
discrete_optimization.flex_scheduling.problem.satisfy_precedence_group(solution: ScheduleSolution, problem: FlexProblem)[source]
discrete_optimization.flex_scheduling.problem.satisfy_release_and_date(solution: ScheduleSolution, problem: FlexProblem)[source]
discrete_optimization.flex_scheduling.problem.satisfy_task_duration(solution, usage, problem)[source]

discrete_optimization.flex_scheduling.simulator module

class discrete_optimization.flex_scheduling.simulator.PostprocessTool(flex_problem: FlexProblem, solution: ScheduleSolution)[source]

Bases: object

post_process_left(flex_problem: FlexProblem, solution: ScheduleSolution, keep_min_time: bool = False, keep_strict_order_task: bool = False)[source]
class discrete_optimization.flex_scheduling.simulator.StateOfEvent(event: tuple[discrete_optimization.flex_scheduling.problem.TaskGroupAbstraction, discrete_optimization.flex_scheduling.problem.TaskGroupAbstraction, dict[Hashable, int]], source: set[Hashable], sink: set[Hashable], active: bool = False, start_event_triggered: int = None, end_event_triggered: int = None)[source]

Bases: object

active: bool = False
end_event_triggered: int = None
event: tuple[TaskGroupAbstraction, TaskGroupAbstraction, dict[Hashable, int]]
sink: set[Hashable]
source: set[Hashable]
start_event_triggered: int = None
update_state_of_event(full_schedule: dict[Hashable, list[tuple[int, int]]])[source]
class discrete_optimization.flex_scheduling.simulator.StateOfGroup(event: discrete_optimization.flex_scheduling.problem.TasksGroups, active: bool = False, start_event_triggered: int = None, end_event_triggered: int = None)[source]

Bases: object

active: bool = False
end_event_triggered: int = None
event: TasksGroups
start_event_triggered: int = None
update_state_of_event(full_schedule: dict[Hashable, list[tuple[int, int]]])[source]
discrete_optimization.flex_scheduling.simulator.build_graph_dependencies(flex_problem: FlexProblem)[source]
discrete_optimization.flex_scheduling.simulator.build_task_to_set_of_groups(flex_problem: FlexProblem)[source]
discrete_optimization.flex_scheduling.simulator.check_solution(solution: ScheduleSolutionPreemptive, problem: FlexProblem)[source]

To rework, do some basic check (was used to debug the postpro methods..

discrete_optimization.flex_scheduling.simulator.compute_equivalent_preemptive_solution(flex_problem: FlexProblem, solution: ScheduleSolution)[source]

Dont change the actual schedule but make the solution preemptive

discrete_optimization.flex_scheduling.simulator.get_tasks_in_events(problem: FlexProblem, event: tuple[TaskGroupAbstraction, TaskGroupAbstraction, dict[Hashable, int]])[source]
discrete_optimization.flex_scheduling.simulator.init_capacity_resource(flex_problem: FlexProblem)[source]
discrete_optimization.flex_scheduling.simulator.post_process_schedule(flex_problem: FlexProblem, solution: ScheduleSolution, keep_min_time: bool = False, keep_strict_order_task: bool = False)[source]
discrete_optimization.flex_scheduling.simulator.resource_consumption(flex_problem: FlexProblem, solution: ScheduleSolution)[source]
discrete_optimization.flex_scheduling.simulator.resource_consumption_modes(flex_problem: FlexProblem)[source]

Module contents