discrete_optimization.generic_tasks_tools package

Subpackages

Submodules

discrete_optimization.generic_tasks_tools.allocation module

class discrete_optimization.generic_tasks_tools.allocation.AllocationCpSolver(problem: Problem, params_objective_function: ParamsObjectiveFunction | None = None, **kwargs: Any)[source]

Bases: TasksCpSolver[Task], Generic[Task, UnaryResource]

Base class for solver managing constraints on allocation.

abstractmethod add_constraint_on_nb_allocation_changes(ref: AllocationSolution[Task, UnaryResource], nb_changes: int) list[Any][source]

Add contraint on maximal number of allocation changes from the given reference.

Parameters:
  • ref

  • nb_changes – maximal number of changes

Returns:

resulting constraints

abstractmethod add_constraint_on_task_unary_resource_allocation(task: Task, unary_resource: UnaryResource, used: bool) list[Any][source]

Add constraint on allocation of given unary resource for the given task

Parameters:
  • task

  • unary_resource

  • used – if True, we enforce the allocation of unary_resource to task, else we prevent it

Returns:

resulting constraints

abstractmethod add_constraint_on_total_nb_usages(sign: SignEnum, target: int) list[Any][source]
abstractmethod add_constraint_on_unary_resource_nb_usages(unary_resource: UnaryResource, sign: SignEnum, target: int) list[Any][source]
add_constraint_same_allocation_as_ref(ref: AllocationSolution[Task, UnaryResource], tasks: Iterable[Task] | None = None, unary_resources: Iterable[UnaryResource] | None = None) list[Any][source]

Add constraint to keep same allocation as the reference for the given tasks and unary resources subsets.

Parameters:
  • ref

  • tasks

  • unary_resources

Returns:

resulting constraints

get_default_tasks_n_unary_resources(tasks: Iterable[Task] | None = None, unary_resources: Iterable[UnaryResource] | None = None) tuple[Iterable[Task], Iterable[UnaryResource]][source]
abstractmethod get_nb_tasks_done_variable() Any[source]

Construct and get the variable tracking number of tasks with at least a resource allocated.

Returns:

objective variable to minimize

abstractmethod get_nb_unary_resources_used_variable() Any[source]

Construct and get the variable tracking number of tasks with at least a resource allocated.

Returns:

objective variable to minimize

is_compatible_task_unary_resource(task: Task, unary_resource: UnaryResource) bool[source]

Should return False if the unary_resource can never be allocated to task.

This is only a hint used to reduce the number of variables or constraints generated. Default to use problem.is_compatible_task_unary_resource().

But you can override it if you want to have more constraints in the solver than in the problem.

problem: AllocationProblem[Task, UnaryResource]
class discrete_optimization.generic_tasks_tools.allocation.AllocationProblem[source]

Bases: TasksProblem[Task], Generic[Task, UnaryResource]

Base class for allocation problems.

An allocation problems consist in allocating resources to tasks.

get_index_from_unary_resource(unary_resource: UnaryResource) int[source]
get_unary_resource_from_index(i: int) UnaryResource[source]
is_compatible_task_unary_resource(task: Task, unary_resource: UnaryResource) bool[source]

Should return False if the unary_resource can never be allocated to task.

This is only a hint used to reduce the number of variables or constraints generated.

Default to True, to be overriden in subclasses.

abstract property unary_resources_list: list[UnaryResource]

Available unary resources.

It can correspond to employees (rcpsp-multiskill), teams (workforce-scheduling), or a mix of several types.

class discrete_optimization.generic_tasks_tools.allocation.AllocationSolution(problem: Problem)[source]

Bases: TasksSolution[Task], Generic[Task, UnaryResource]

Class inherited by a solution for allocation problems.

check_allocation_consistency() bool[source]
check_same_allocation_as_ref(ref: AllocationSolution[Task, UnaryResource], tasks: Iterable[Task] | None = None, unary_resources: Iterable[UnaryResource] | None = None) bool[source]
compute_nb_allocation_changes(ref: AllocationSolution[Task, UnaryResource], tasks: Iterable[Task] | None = None, unary_resources: Iterable[UnaryResource] | None = None) int[source]
compute_nb_tasks_done() int[source]

Compute number of tasks with at least a resource allocated.

compute_nb_unary_resource_usages(tasks: Iterable[Task] | None = None, unary_resources: Iterable[UnaryResource] | None = None)[source]
compute_nb_unary_resources_used() int[source]

Compute number of unary resources allocated to at least one task.

get_default_tasks_n_unary_resources(tasks: Iterable[Task] | None = None, unary_resources: Iterable[UnaryResource] | None = None) tuple[Iterable[Task], Iterable[UnaryResource]][source]
get_task_allocation(task: Task) set[UnaryResource][source]
abstractmethod is_allocated(task: Task, unary_resource: UnaryResource) bool[source]

Return the usage of the unary resource for the given task.

Parameters:
  • task

  • unary_resource

Returns:

problem: AllocationProblem[Task, UnaryResource]
class discrete_optimization.generic_tasks_tools.allocation.WithoutAllocationProblem[source]

Bases: AllocationProblem[Task, None], Generic[Task]

Mixin to simplify deriving from GenericSchedulingProblem when no allocation is needed.

property unary_resources_list: list[None]

Available unary resources.

It can correspond to employees (rcpsp-multiskill), teams (workforce-scheduling), or a mix of several types.

class discrete_optimization.generic_tasks_tools.allocation.WithoutAllocationSolution(problem: Problem)[source]

Bases: AllocationSolution[Task, None], Generic[Task]

Mixin to simplify deriving from GenericSchedulingSolution when no allocation is needed.

check_allocation_consistency() bool[source]
is_allocated(task: Task, unary_resource: UnaryResource) bool[source]

Return the usage of the unary resource for the given task.

Parameters:
  • task

  • unary_resource

Returns:

discrete_optimization.generic_tasks_tools.allocation.get_default_tasks_n_unary_resources(problem: AllocationProblem, tasks: Iterable[Task] | None = None, unary_resources: Iterable[UnaryResource] | None = None) tuple[Iterable[Task], Iterable[UnaryResource]][source]

discrete_optimization.generic_tasks_tools.base module

class discrete_optimization.generic_tasks_tools.base.TasksCpSolver(problem: Problem, params_objective_function: ParamsObjectiveFunction | None = None, **kwargs: Any)[source]

Bases: CpSolver, Generic[Task]

Base class for cp solver handling tasks problems.

problem: TasksProblem[Task]
class discrete_optimization.generic_tasks_tools.base.TasksProblem[source]

Bases: Problem, Generic[Task]

Base class for scheduling/allocation problems.

get_index_from_task(task: Task) int[source]
get_task_from_index(i: int) Task[source]
abstract property tasks_list: list[Task]

List of all tasks to schedule or allocate to.

update_tasks_list() None[source]

To be call when tasks_list is updated to reset the cache.

class discrete_optimization.generic_tasks_tools.base.TasksSolution(problem: Problem)[source]

Bases: Solution, Generic[Task]

Base class for sheduling/allocation solutions.

problem: TasksProblem[Task]

discrete_optimization.generic_tasks_tools.calendar_resource module

class discrete_optimization.generic_tasks_tools.calendar_resource.CalendarResourceProblem[source]

Bases: SchedulingProblem[Task], Generic[Task, Resource]

Base class for scheduling problems dealing with renewable resources whose availability depend on a calendar.

abstract property calendar_resources_list: list[Resource]

Renewable resources with an availability calendar used by the tasks.

Notes

  • renewable = the resource replenishes as soon as a task using it ends;

  • it can be a mix of unary resources (e.g. employees) and cumulative resources (e.g. tool types).

  • calendar can be constant

get_fake_tasks(resource: Resource) list[tuple[int, int, int]][source]

Get fake tasks explaining the delta between resource current capacity and its max capacity

Parameters:

resource

Returns:

list of intervals of the form (start, end, value), for task starting at start and ending at end, using value resource.

abstractmethod get_resource_availabilities(resource: Resource) list[tuple[int, int, int]][source]

Get availabilities intervals for a given resource

List of availability intervals of a resource. If the resource is not available, potentially no interval returned.

It is assumed that the intervals are disjunct though.

Parameters:

resource

Returns:

list of intervals of the form (start, end, value), which means from time start to time end, there are value of the resource available. NB: the start is included, the end is excluded (start <= t < end)

get_resource_calendar(resource: Resource, horizon: int | None = None) list[int][source]

Compute resource calendar.

Parameters:
  • resource

  • horizon – max value of time considered. Default to self.get_makespan_upper_bound().

Returns:

list of resource value by time step from 0 to horizon-1.

get_resource_consolidated_availabilities(resource: Resource, horizon: int | None = None) list[tuple[int, int, int]][source]

Get availabilities intervals for a given resource, consolidated as a partition of [0, horizon)

Default implementation use the get_calendar_resource_availabilities, by assuming the intervals are disjunct but potentially lacking 0-valued intervals.

Parameters:
  • resource

  • horizon – max value of time considered. Default to self.get_makespan_upper_bound().

Returns:

sorted list of intervals of the form (start, end, value), which means from time start to time end, there are value of the resource available NB: the start is included, the end is excluded (start <= t < end)

get_resource_max_capacity(resource: Resource) int[source]

Get max capacity of the given resource

Default implementation take the max over its calendar and cache it.

Parameters:

resource

Returns:

update_resource_availabilities() None[source]

Method to call when the resource availabilities have changed.

Default implementation clears the cache on get_resource_max_capacity().

class discrete_optimization.generic_tasks_tools.calendar_resource.CalendarResourceSolution(problem: Problem)[source]

Bases: SchedulingSolution[Task], Generic[Task, Resource]

check_all_calendar_resource_capacity_constraints() bool[source]

Check capacity constraint on all calendar resources.

check_calendar_resource_capacity_constraint(resource: Resource) bool[source]

Check capacity constraint on given resource.

check_calendar_resource_capacity_constraints(resources: Iterable[Resource]) bool[source]

Check capacity constraint respected on given resources.

Do it simultaneously on all resources to optimize computation time.

compute_aggregated_calendar_resources_levels(weights: dict[Resource, int] | None = None) int[source]

Compute aggregated level (i.e. min capacity needed) of each calendar resource.

Parameters:

weights – optional weights to apply to each resource in the sum. Default to 1.

compute_calendar_resources_levels() dict[Resource, int][source]

Compute the level (i.e. min capacity needed) for each calendar resource.

compute_nb_calendar_resources_used(weights: dict[Resource, int] | None = None) int[source]

Compute number of calendar resources used by at least one task.

Parameters:

weights – optional weights to apply to each resource in the sum. Default to 1.

Returns:

abstractmethod get_calendar_resource_consumption(resource: Resource, task: Task) int[source]

Get resource consumption by given task.

Parameters:
  • resource

  • task

Returns:

problem: CalendarResourceProblem[Task, Resource]
class discrete_optimization.generic_tasks_tools.calendar_resource.WithoutCalendarResourceProblem[source]

Bases: CalendarResourceProblem[Task, None], Generic[Task]

property calendar_resources_list: list[Resource]

Renewable resources with an availability calendar used by the tasks.

Notes

  • renewable = the resource replenishes as soon as a task using it ends;

  • it can be a mix of unary resources (e.g. employees) and cumulative resources (e.g. tool types).

  • calendar can be constant

get_resource_availabilities(resource: Resource) list[tuple[int, int, int]][source]

Get availabilities intervals for a given resource

List of availability intervals of a resource. If the resource is not available, potentially no interval returned.

It is assumed that the intervals are disjunct though.

Parameters:

resource

Returns:

list of intervals of the form (start, end, value), which means from time start to time end, there are value of the resource available. NB: the start is included, the end is excluded (start <= t < end)

class discrete_optimization.generic_tasks_tools.calendar_resource.WithoutCalendarResourceSolution(problem: Problem)[source]

Bases: CalendarResourceSolution[Task, None], Generic[Task]

check_all_calendar_resource_capacity_constraints() bool[source]

Check capacity constraint on all calendar resources.

check_calendar_resource_capacity_constraint(resource: Resource) bool[source]

Check capacity constraint on given resource.

check_calendar_resource_capacity_constraints(resources: Iterable[Resource]) bool[source]

Check capacity constraint respected on given resources.

Do it simultaneously on all resources to optimize computation time.

compute_aggregated_calendar_resources_levels(weights: dict[Resource, int] | None = None)[source]

Compute aggregated level (i.e. min capacity needed) of each calendar resource.

Parameters:

weights – optional weights to apply to each resource in the sum. Default to 1.

compute_calendar_resources_levels() dict[Resource, int][source]

Compute the level (i.e. min capacity needed) for each calendar resource.

compute_nb_calendar_resources_used(weights: dict[Resource, int] | None = None) int[source]

Compute number of calendar resources used by at least one task.

Parameters:

weights – optional weights to apply to each resource in the sum. Default to 1.

Returns:

get_calendar_resource_consumption(resource: Resource, task: Task) int[source]

Get resource consumption by given task.

Parameters:
  • resource

  • task

Returns:

discrete_optimization.generic_tasks_tools.calendar_resource.consolidate_availability_intervals(intervals: list[tuple[int, int, int]], horizon: int)[source]

Ensure that the intervals are a partition of [0, horizon).

Parameters:
  • intervals – intervals (start, end, value). Supposed to be disjunct.

  • horizon – max value of time considered

Returns:

sorted list of intervals constituting a partition of [0, horizon)

discrete_optimization.generic_tasks_tools.calendar_resource.consolidate_calendar(calendar: list[int], horizon: int)[source]

Consoidate the calendar to correspond to the given horizon

If too long, retursn calendar[:horizon]. If too short, fill with 0’s.

Parameters:
  • calendar

  • horizon

Returns:

a calendar of size horizon

discrete_optimization.generic_tasks_tools.calendar_resource.convert_availability_intervals_to_calendar(intervals: list[tuple[int, int, int]], horizon: int) list[int][source]

Convert availability intervals into a calendar.

Parameters:
  • intervals – availability intervals, assumed to be disjunct

  • horizon – maximum time step considered

Returns:

list of available resource values for each time step

discrete_optimization.generic_tasks_tools.calendar_resource.convert_calendar_to_availability_intervals(calendar: int | list[int], horizon: int) list[tuple[int, int, int]][source]

Convert a calendar into availability intervals.

Parameters:
  • calendar – if integer means a constant value, else list of values for each time step. If len(calendar)<horizon, last values are assumed to be 0. If len(calendar)>horizon, it is truncated to calendar[:horizon]

  • horizon – maximum time step considered

Returns:

list of (start,end, value), a sorted partition of [0, horizon)

discrete_optimization.generic_tasks_tools.calendar_resource.merge_resources_availability_intervals(intervals_per_resource: list[list[tuple[int, int, int]]], horizon: int) list[tuple[int, int, int]][source]

Merge several resources availability intervals, considering all resources as one meta-resource

Parameters:
  • intervals_per_resource – availability for each resource

  • horizon – maximum time step considered

Returns:

availability intervals for the meta-resource

discrete_optimization.generic_tasks_tools.calendar_resource.merge_resources_calendars(calendars: list[list[int]], horizon: int) list[int][source]

Merge several resources calendars, considering all resources as one meta-resource

Parameters:
  • calendars – calendars for each resource to merge

  • horizon – maximum time step considered

Returns:

calendar for the meta-resource

discrete_optimization.generic_tasks_tools.cumulative_resource module

class discrete_optimization.generic_tasks_tools.cumulative_resource.CumulativeResourceProblem[source]

Bases: CalendarResourceProblem[Task, CumulativeResource | OtherCalendarResource], MultimodeSchedulingProblem[Task], Generic[Task, CumulativeResource, OtherCalendarResource]

Scheduling problem with cumulative resources consumed by task.

This derives from problem with renewable calendar resources, some of them are cumulative, some are not (e.g. unary resource if it is moreover an allocation problem). The task consumption of these cumulative resources is supposed to be determined entirely determined by the task mode.

abstract property cumulative_resources_list: list[CumulativeResource]
abstractmethod get_cumulative_resource_consumption(resource: CumulativeResource, task: Task, mode: int) int[source]

Get cumulative resource consumption of the task in the given mode

Parameters:
  • resource – cumulative resource

  • task

  • mode – not used for single mode problems

Returns:

the consumption for cumulative resources.

is_cumulative_resource(resource: CumulativeResource | OtherCalendarResource) bool[source]

Check if given resource is a cumulative resource whose consumption depends only on task mode.

Parameters:

resource

Returns:

class discrete_optimization.generic_tasks_tools.cumulative_resource.CumulativeResourceSolution(problem: Problem)[source]

Bases: CalendarResourceSolution[Task, CumulativeResource | OtherCalendarResource], MultimodeSchedulingSolution[Task], Generic[Task, CumulativeResource, OtherCalendarResource]

Solution type associated to CumulativeResourceProblem.

get_calendar_resource_consumption(resource: CumulativeResource | OtherCalendarResource, task: Task) int[source]

Get resource consumption by given task.

Default implementation works only for cumulative resources whose consumptions depend only on task mode.

Parameters:
  • resource

  • task

Returns:

problem: CumulativeResourceProblem[Task, CumulativeResource, OtherCalendarResource]
class discrete_optimization.generic_tasks_tools.cumulative_resource.WithoutCumulativeResourceProblem[source]

Bases: CumulativeResourceProblem[Task, None, OtherCalendarResource], Generic[Task, OtherCalendarResource]

Mixin for problem without cumulative resources.

To be used has an additional mixin with generic GenericSchedulingProblem.

property cumulative_resources_list: list[CumulativeResource]
get_cumulative_resource_consumption(resource: CumulativeResource, task: Task, mode: int) int[source]

Get cumulative resource consumption of the task in the given mode

Parameters:
  • resource – cumulative resource

  • task

  • mode – not used for single mode problems

Returns:

the consumption for cumulative resources.

class discrete_optimization.generic_tasks_tools.cumulative_resource.WithoutCumulativeResourceSolution(problem: Problem)[source]

Bases: CumulativeResourceSolution[Task, None, OtherCalendarResource], Generic[Task, OtherCalendarResource]

Mixin for solution without cumulative resources.

To be used has an additional mixin with generic GenericSchedulingSolution.

discrete_optimization.generic_tasks_tools.enums module

class discrete_optimization.generic_tasks_tools.enums.MinOrMax(*values)[source]

Bases: Enum

MAX = 'max'
MIN = 'min'
class discrete_optimization.generic_tasks_tools.enums.StartOrEnd(*values)[source]

Bases: Enum

END = 'end'
START = 'start'

discrete_optimization.generic_tasks_tools.generic_scheduling module

class discrete_optimization.generic_tasks_tools.generic_scheduling.GenericSchedulingProblem[source]

Bases: SkillProblem[Task, UnaryResource, Skill, NonSkillCumulativeResource, UnaryResource], NonRenewableResourceProblem[Task, NonRenewableResource], PrecedenceSchedulingProblem[Task], TimelagProblem[Task], TimewindowProblem[Task], NoOverlapProblem[Task], Generic[Task, UnaryResource, Skill, NonSkillCumulativeResource, NonRenewableResource]

Scheduling problem with all optional features

This class derives from other mixins to provide utilities that require that mix: - scheduling: tasks need to be scheduled - calendar: the renewable resources have their own calendar that will be used for constraining allocations and schedule - multimode: the tasks have several mode on which the duration depends - cumulative: the tasks consume cumulative resources according to the chosen mode - allocation: the tasks can have unary resources allocated to them - skill: some cumulative resource are skills that are brought to tasks by allocated unary resources - non-renewable: the tasks consume non-renewable resources according to the chosen mode - precedence: precedence constraints between tasks - cost: the choice of a mode or of an allocation has a given cost

Even though this class is generic but encompasses also more specific cases: - singlemode: actually only one mode per task - no skills: if skills_list is empty - no allocation: unary_resources is empty - no cumulative ressources: if resources_list list only unary resources - no calendar: resource capacity can be given as a constant on [0, horizon) - no non-renewable ressources: if non_renewable_resources_list empty - no precedence constraints: precedence constraints empty - no cost: cost = 0

We suppose that all renewable resources are - either cumulative ones - or unary resources

This generic class is to be used to construct generic automatic solvers (e.g. ).

property calendar_resources_list: list[Skill | NonSkillCumulativeResource | UnaryResource]

Renewable resources with an availability calendar used by the tasks.

Notes

  • renewable = the resource replenishes as soon as a task using it ends;

  • it can be a mix of unary resources (e.g. employees) and cumulative resources (e.g. tool types).

  • calendar can be constant

check_calendar_resources_list() None[source]

Check calendar resources list.

Raises:

AssertionError – if duplicates appear in the list

Returns:

compute_penalty(variable: GenericSchedulingSolution, penalty: Penalty) int[source]

Compute penalty from given solution.

compute_subobjective(variable: GenericSchedulingSolution, objective: Objective, resource_weights: dict[NonRenewableResource | Skill | NonSkillCumulativeResource | UnaryResource, int] | None = None) int[source]

Compute subobjective from given solution.

compute_tighter_task_bounds(use_cpm: bool = False, horizon: int | None = None) dict[Task, tuple[int, int, int, int]][source]

Compute tighter task bounds from problem time windows and min-max tak durations.

Parameters:
  • use_cpm – whether to use CPM propagating bounds through precedence graph

  • horizon – new horizon to take into account when computing tighter bounds, default to problem horizon.

Returns:

(start_lower_bound, end_lower_bound, start_upper_bound, end_upper_bound)}

Return type:

{task

get_consolidated_precedence_constraints() dict[Task, set[Task]][source]

Consolidate precedence constraints defined by problem.

It takes into account time lags constraints. - end to start min constraint with non-negative offsets => precedence constraint - start synchronization => corresponding tasks should appear together in successors - end synchornization => corresponding tasks should share their successors

get_consolidated_time_lags(task1_start_or_end: StartOrEnd, task2_start_or_end: StartOrEnd, min_or_max: MinOrMax)[source]

Get consolidated time lags.

Same normalization as in TimelagProblem parent class. Also taking into account precedence constraints to enrich end to start min time lags.

Parameters:
  • task1_start_or_end

  • task2_start_or_end

  • min_or_max

Returns:

get_makespan_lower_bound() int[source]

Get a lower bound on global makespan.

Computed tighter lower bounds on last tasks can be used to get a better makespan lower bound.

get_makespan_tighter_lower_bound(use_cpm: bool = False, horizon: int | None = None) int[source]

Get a tighter lower bound on global makespan.

Parameters:
  • use_cpm – whether to use CPM bound propagation through precedence graph to improve tightness

  • horizon – new horizon to take into account when computing tighter bounds, default to problem horizon. NB: The choice of horizon should not affect the result for the lower bound, but it could avoid CPM complete recomputation thanks to caching if it was already launched with same horizon.

get_makespan_tighter_upper_bound() int[source]

Compute a tighter upper bound on makespan.

The original makespan upper bound is used when computing tighter bounds for tasks starts and ends, via self.compute_tighter_task_bounds() or self.get_task_start_or_end_tighter_upper_bound(). From that tighter bounds, we can derive a new makespan upper bound.

get_mode_cost(task: Task, mode: int) int[source]

Get cost of choosing given mode.

Default to no cost. To be overridden in child classes with actual costs.

Parameters:
  • task

  • mode

Returns:

get_task_start_or_end_tighter_lower_bound(task: Task, start_or_end: StartOrEnd, use_cpm: bool = False, horizon: int | None = None) int[source]

Get a tighter lower bound on task start or end using possible durations.

Parameters:
  • use_cpm – whether to use CPM propagating bounds through precedence graph

  • horizon – new horizon to take into account when computing tighter bounds, default to problem horizon.

get_task_start_or_end_tighter_upper_bound(task: Task, start_or_end: StartOrEnd, use_cpm: bool = False, horizon: int | None = None) int[source]

Get a tighter upper bound on task start or end using possible durations.

Parameters:
  • use_cpm – whether to use CPM propagating bounds through precedence graph

  • horizon – new horizon to take into account when computing tighter bounds, default to problem horizon.

get_unary_resource_cost(task: Task, mode: int, unary_resource: UnaryResource) int[source]

Get cost of allocating given unary resource.

Default to no cost. To be overridden in child classes with actual costs.

Parameters:
  • task

  • mode

  • unary_resource

Returns:

is_unary_resource(resource: Skill | NonSkillCumulativeResource | UnaryResource) bool[source]

Check if given resource is a unary resource.

satisfy(variable: GenericSchedulingSolution) bool[source]

Computes if a solution satisfies or not the constraints of the problem.

Parameters:

variable – the Solution object to check satisfability

Returns (bool): boolean true if the constraints are fulfilled, false elsewhere.

satisfy_partial(variable: GenericSchedulingSolution, duration: bool = True, calendar: bool = True, non_renewable_capacity: bool = True, precedence: bool = True, skill: bool = True, allocation: bool = True, time_lags: bool = True, time_windows: bool = True, no_overlap: bool = True) bool[source]

Partial checks on solution.

One can switch off some checks by setting the corresponding parameter to False.

Parameters:
  • variable

  • duration

  • calendar

  • non_renewable_capacity

  • precedence

  • skill

  • allocation

  • time_lags

  • time_windows

  • no_overlap

Returns:

update_precedence_constraints() None[source]

Method to call when precedence constraints have been updated.

Clear cache from consolidated precedence constraints and time lags.

Returns:

update_resource_availabilities() None[source]

Method to call when the resource availabilities have changed.

Default implementation clears the cache on get_resource_max_capacity().

update_task_bounds() None[source]

Method to be called when problem time windows are updated.

It clears necessary cache on computed tighter bounds.

update_time_lags() None[source]

Method to call when time lags have been updated.

Clear cache from consolidated precedence constraints and time lags.

Returns:

class discrete_optimization.generic_tasks_tools.generic_scheduling.GenericSchedulingSolution(problem: Problem)[source]

Bases: SkillSolution[Task, UnaryResource, Skill, NonSkillCumulativeResource, UnaryResource], NonRenewableResourceSolution[Task, NonRenewableResource], PrecedenceSchedulingSolution[Task], TimelagSolution[Task], TimewindowSolution[Task], NoOverlapSolution[Task], Generic[Task, UnaryResource, Skill, NonSkillCumulativeResource, NonRenewableResource]

Solution type associated to GenericSchedulingProblem.

compute_cost() int[source]
get_calendar_resource_consumption(resource: Skill | NonSkillCumulativeResource | UnaryResource, task: Task) int[source]
problem: GenericSchedulingProblem[Task, UnaryResource, Skill, NonSkillCumulativeResource, NonRenewableResource]

discrete_optimization.generic_tasks_tools.generic_scheduling_impl module

class discrete_optimization.generic_tasks_tools.generic_scheduling_impl.GenericSchedulingImplProblem(horizon: int, durations_per_mode: dict[Hashable, dict[int, int]], resource_consumptions: dict[Hashable, dict[int, dict[str, int]]] | None = None, successors: dict[Hashable, Iterable[Hashable]] | None = None, unary_resources: set[str] | None = None, unary_resources_skills: dict[str, dict[str, int]] | None = None, unary_resources_availabilities: dict[str, list[tuple[int, int]]] | None = None, skills: set[str] | None = None, non_skill_cumulative_resources: dict[str, int | list[tuple[int, int, int]]] | None = None, non_renewable_resources: dict[str, int] | None = None, time_windows: dict[Hashable, tuple[int | None, int | None, int | None, int | None]] | None = None, start_to_start_min_time_lags: list[tuple[Hashable, Hashable, int]] | None = None, start_to_end_min_time_lags: list[tuple[Hashable, Hashable, int]] | None = None, end_to_start_min_time_lags: list[tuple[Hashable, Hashable, int]] | None = None, end_to_end_min_time_lags: list[tuple[Hashable, Hashable, int]] | None = None, no_overlap_sets: set[frozenset[Hashable]] = None, objective: Objective | Iterable[tuple[Objective, int]] = Objective.MAKESPAN, custom_evaluate_fn: Callable[[GenericSchedulingImplSolution], int] | None = None, objective_resource_weights: dict[str, int] | None = None, mode_costs: dict[Hashable, dict[int, int]] | None = None, unary_resource_costs: dict[Hashable, dict[int, dict[str, int]]] | None = None, compute_time_penalty: bool = True)[source]

Bases: GenericSchedulingProblem[Hashable, str, str, str, str]

Generic implementation of a scheduling problem.

It implements the abstract class GenericSchedulingProblem.

check_resources_lists() None[source]

Check duplicates in resources.

compute_subobjective(variable: GenericSchedulingSolution, objective: Objective, resource_weights: dict[str, int] | None = None) int[source]

Compute subobjective from given solution.

evaluate(variable: Solution) dict[str, float][source]

Evaluate a given solution object for the given problem.

This method should return a dictionnary of KPI, that can be then used for mono or multiobjective optimization.

Parameters:

variable (Solution) – the Solution object to evaluate.

Returns: dictionnary of float kpi for the solution.

get_attribute_register() EncodingRegister[source]

Returns how the Solution should be encoded.

Useful to find automatically available mutations for local search. Used by genetic algorithms Ga and Nsga.

This needs only to be implemented in child classes when GA or LS solvers are to be used.

Returns (EncodingRegister): content of the encoding of the solution

get_cumulative_resource_consumption(resource: str, task: Hashable, mode: int) int[source]

Get cumulative resource consumption of the task in the given mode

Parameters:
  • resource – cumulative resource

  • task

  • mode – not used for single mode problems

Returns:

the consumption for cumulative resources.

get_dummy_solution() Solution[source]

Create a trivial solution for the problem.

Should satisfy the problem ideally. Does not exist for all kind of problems.

get_end_to_end_min_time_lags() list[tuple[Hashable, Hashable, int]][source]

Get min time lags between task ends.

Default to no min time lags. Should be overriden in child class for problems with min time lags.

Returns:

list of task1, task2, offset meaning end(task1) + offset <= end(task2)

get_end_to_start_min_time_lags() list[tuple[Hashable, Hashable, int]][source]

Get min time lags between first task end and second task start.

Default to no min time lags. Should be overriden in child class for problems with min time lags.

Returns:

list of task1, task2, offset meaning end(task1) + offset <= start(task2)

get_makespan_upper_bound() int[source]

Get an upper bound on global makespan.

get_mode_cost(task: Hashable, mode: int) int[source]

Get cost of choosing given mode.

Default to no cost. To be overridden in child classes with actual costs.

Parameters:
  • task

  • mode

Returns:

get_no_overlap() set[frozenset[Hashable]][source]

An object in this returned set is a (frozen) set of task, where no task should overlap with another one in this set

get_non_renewable_resource_capacity(resource: str) int[source]

Get resource max capacity

Parameters:

resource

Returns:

get_non_renewable_resource_consumption(resource: str, task: Hashable, mode: int) int[source]

Get resource consumption of the task in the given mode

Parameters:
  • resource – non-renewable resource

  • task

  • mode – not used for single mode problems

Returns:.

Raises:

ValueError – if resource consumption is depending on other variables than mode

get_objective_register() ObjectiveRegister[source]

Returns the objective definition.

Returns (ObjectiveRegister): object defining the objective criteria.

get_precedence_constraints() dict[Hashable, Iterable[Hashable]][source]

Map each task to the tasks that need to be performed after it.

get_resource_availabilities(resource: str) list[tuple[int, int, int]][source]

Get availabilities intervals for a given resource

List of availability intervals of a resource. If the resource is not available, potentially no interval returned.

It is assumed that the intervals are disjunct though.

Parameters:

resource

Returns:

list of intervals of the form (start, end, value), which means from time start to time end, there are value of the resource available. NB: the start is included, the end is excluded (start <= t < end)

get_solution_type() type[Solution][source]

Returns the class implementation of a Solution.

Returns (class): class object of the given Problem.

get_start_to_end_min_time_lags() list[tuple[Hashable, Hashable, int]][source]

Get min time lags between first task start and second task end.

Default to no min time lags. Should be overriden in child class for problems with min time lags.

Returns:

list of task1, task2, offset meaning start(task1) + offset <= end(task2)

get_start_to_start_min_time_lags() list[tuple[Hashable, Hashable, int]][source]

Get min time lags between tasks starts.

Default to no min time lags. Should be overriden in child class for problems with min time lags.

Returns:

list of task1, task2, offset meaning start(task1) + offset <= start(task2)

get_task_mode_duration(task: Hashable, mode: int) int[source]

Get task duration according to mode.

Parameters:
  • task

  • mode – not used for single-mode problems

Returns:

get_task_modes(task: Hashable) set[int][source]

Retrieve mode found for given task.

Parameters:

task

Returns:

get_task_start_or_end_lower_bound(task: Hashable, start_or_end: StartOrEnd) int[source]

Get a lower bound on start or end of a given task as specified by the problem.

For tighter computed bounds, see GenericSchedulingProblem.get_tight_task_start_or_end_lower_bound() and GenericSchedulingProblem.compute_task_bounds().

Default implementation: 0

Parameters:
  • task

  • start_or_end

Returns:

get_task_start_or_end_upper_bound(task: Hashable, start_or_end: StartOrEnd) int[source]

Get an upper bound on start or end of a given task as specified by the problem.

For tighter computed bounds, see GenericSchedulingProblem.get_tight_task_start_or_end_upper_bound() and GenericSchedulingProblem.compute_task_bounds().

Default implementation: makespan upper bound

Parameters:
  • task

  • start_or_end

Returns:

get_unary_resource_cost(task: Hashable, mode: int, unary_resource: str) int[source]

Get cost of allocating given unary resource.

Default to no cost. To be overridden in child classes with actual costs.

Parameters:
  • task

  • mode

  • unary_resource

Returns:

get_unary_resource_skill_value(unary_resource: str, skill: str) int[source]

Skill value of given resource for given skill.

property non_renewable_resources_list: list[str]

Non-renewable resources used by the tasks.

property non_skill_cumulative_resources_list: list[str]

List of cumulative resources that are not skills.

satisfy(variable: Solution) bool[source]

Computes if a solution satisfies or not the constraints of the problem.

Parameters:

variable – the Solution object to check satisfability

Returns (bool): boolean true if the constraints are fulfilled, false elsewhere.

satisfy_partial(variable: GenericSchedulingImplSolution, duration: bool = True, calendar: bool = True, non_renewable_capacity: bool = True, precedence: bool = True, skill: bool = True, allocation: bool = True, time_lags: bool = True, time_windows: bool = True) bool[source]

Partial checks on solution.

One can switch off some checks by setting the corresponding parameter to False.

Parameters:
  • variable

  • duration

  • calendar

  • non_renewable_capacity

  • precedence

  • skill

  • allocation

  • time_lags

  • time_windows

Returns:

set_fixed_attributes(attribute_name: str, solution: Solution) None[source]

Fix some solution attribute.

Useful when applying successively GA on different attribute of the solution, fixing the others.

Should be implemented at least for attributes described by attribute_register.

Parameters:
  • attribute_name – an attribute name

  • solution

Returns:

property skills_list: list[str]

List of skills needed by tasks and brought by unary resources.

property tasks_list: list[Hashable]

List of all tasks to schedule or allocate to.

property unary_resources_list: list[str]

Available unary resources.

It can correspond to employees (rcpsp-multiskill), teams (workforce-scheduling), or a mix of several types.

update_problem()[source]

Method to call when some attributes of the problem are modified.

update_resource_availabilities() None[source]

Method to call when the resource availabilities have changed.

Default implementation clears the cache on get_resource_max_capacity().

class discrete_optimization.generic_tasks_tools.generic_scheduling_impl.GenericSchedulingImplSolution(problem: GenericSchedulingImplProblem, raw_sol: RawSolution)[source]

Bases: GenericSchedulingSolution[Hashable, str, str, str, str]

Generic implementation of a solution to a scheduling problem.

It implements the abstract class GenericSchedulingSolution.

copy() Solution[source]

Deep copy of the solution.

The copy() function should return a new object containing the same input as the current object, that respects the following expected behaviour: -y = x.copy() -if do some inplace change of y, the changes are not done in x.

Returns: a new object from which you can manipulate attributes without changing the original object.

get_end_time(task: Hashable) int[source]
get_mode(task: Hashable) int[source]

Retrieve mode found for given task.

Parameters:

task

Returns:

get_start_time(task: Hashable) int[source]
get_task_allocation(task: Hashable) set[str][source]
is_allocated(task: Hashable, unary_resource: str) bool[source]

Return the usage of the unary resource for the given task.

Parameters:
  • task

  • unary_resource

Returns:

is_skill_used(task: Hashable, unary_resource: str, skill: str) bool[source]

Tell whether the given skill from given unary_resource is used in given task.

If True, self.is_allocated(task, unary_resource) must also be True. If the skill is not needed by the task or not in unary_resource skills, should return False.

lazy_copy() Solution[source]

This function should return a new object but possibly with mutable attributes from the original objects.

A typical use of lazy copy is in evolutionary algorithms or genetic algorithm where the use of local move don’t need to do a possibly costly deepcopy.

Returns (Solution): copy (possibly shallow) of the Solution

problem: GenericSchedulingImplProblem

discrete_optimization.generic_tasks_tools.generic_scheduling_utils module

discrete_optimization.generic_tasks_tools.generic_scheduling_utils.OBJECTIVE_DEFAULT_WEIGHTS: dict[Objective, int] = {Objective.COST: -1, Objective.CUSTOM: 1, Objective.MAKESPAN: -1, Objective.NB_RESOURCES_USED: -1, Objective.NB_TASKS_DONE: 1, Objective.NB_UNARY_RESOURCES_USED: -1, Objective.RESOURCES_LEVELS: -1}

Default weight applied to a given objective so that it will be maximized.

class discrete_optimization.generic_tasks_tools.generic_scheduling_utils.Objective(*values)[source]

Bases: Enum

Objective for a generic scheduling problem.

COST = 'cost'

Cost of the solution taking into account mode choice and resources consumptions.

CUSTOM = 'custom_objective'
MAKESPAN = 'makespan'

Global makespan of the schedule, to minimize.

NB_RESOURCES_USED = 'nb_resources_used'

Weighted sum of resources used, to minimize.

Include non-renewable, cumulative, and unary resources. The weigths are to be defined in solver.objective_resource_weights.

NB_TASKS_DONE = 'nb_tasks_done'

Number of tasks with at least one resource allocated, to maximize.

NB_UNARY_RESOURCES_USED = 'nb_unary_resources_used'

Number of allocated unary resources, to minimize.

RESOURCES_LEVELS = 'resources_levels'

Weighted sum of resources levels (i.e. needed capacities), to minimize.

Include non-renewable, cumulative, and unary resources. The weigths are to be defined in solver.objective_resource_weights.

discrete_optimization.generic_tasks_tools.generic_scheduling_utils.PENALTY_DEFAULT_WEIGHTS: dict[Penalty, int] = {Penalty.TIME: -100}

Default weight applied to a given penalty to be added to the objective so that it will be maximized.

class discrete_optimization.generic_tasks_tools.generic_scheduling_utils.Penalty(*values)[source]

Bases: Enum

Penalties for a generic scheduling problem.

TIME = 'time_penalty'
class discrete_optimization.generic_tasks_tools.generic_scheduling_utils.RawSolution(task_variables: dict[~discrete_optimization.generic_tasks_tools.base.Task, ~discrete_optimization.generic_tasks_tools.generic_scheduling_utils.TaskVariable[~discrete_optimization.generic_tasks_tools.allocation.UnaryResource, ~discrete_optimization.generic_tasks_tools.skill.Skill]], metadata: dict[str, ~typing.Any] = <factory>)[source]

Bases: Generic[Task, UnaryResource, Skill]

Raw format for a generic scheduling solution

Does not inherit from d-o Solution class.

.

metadata: dict[str, Any]
task_variables: dict[Task, TaskVariable[UnaryResource, Skill]]
class discrete_optimization.generic_tasks_tools.generic_scheduling_utils.TaskVariable(start: int, end: int, mode: int, allocated: dict[~discrete_optimization.generic_tasks_tools.allocation.UnaryResource, set[~discrete_optimization.generic_tasks_tools.skill.Skill]]=<factory>, info: dict[str, ~typing.Any]=<factory>)[source]

Bases: Generic[UnaryResource, Skill]

Task characteristics found in a generic scheduling solution.

allocated: dict[UnaryResource, set[Skill]]
end: int
info: dict[str, Any]
mode: int
start: int

discrete_optimization.generic_tasks_tools.multimode module

class discrete_optimization.generic_tasks_tools.multimode.MultimodeCpSolver(problem: Problem, params_objective_function: ParamsObjectiveFunction | None = None, **kwargs: Any)[source]

Bases: TasksCpSolver[Task]

Class inherited by a solver managing constraints on tasks modes.

abstractmethod add_constraint_on_task_mode(task: Task, mode: int) list[Any][source]

Add constraint on task mode

The mode of task is fixed to mode.

Parameters:
  • task

  • mode

Returns:

resulting constraints

problem: MultimodeProblem[Task]
class discrete_optimization.generic_tasks_tools.multimode.MultimodeProblem[source]

Bases: TasksProblem[Task]

Class inherited by a solution exposing tasks modes.

abstractmethod get_task_modes(task: Task) set[int][source]

Retrieve mode found for given task.

Parameters:

task

Returns:

property is_multimode: bool
property max_number_of_mode: int
class discrete_optimization.generic_tasks_tools.multimode.MultimodeSolution(problem: Problem)[source]

Bases: TasksSolution[Task]

Class inherited by a solution exposing tasks modes.

abstractmethod get_mode(task: Task) int[source]

Retrieve mode found for given task.

Parameters:

task

Returns:

problem: MultimodeProblem[Task]
class discrete_optimization.generic_tasks_tools.multimode.SinglemodeProblem[source]

Bases: MultimodeProblem[Task]

property default_mode

Default single mode.

To be overriden when default value has more sense with another value (ex: in rcpsp, default mode is 1)

get_task_modes(task: Task) set[int][source]

Retrieve mode found for given task.

Parameters:

task

Returns:

property is_multimode: bool
property max_number_of_mode: int
class discrete_optimization.generic_tasks_tools.multimode.SinglemodeSolution(problem: Problem)[source]

Bases: MultimodeSolution[Task]

get_mode(task: Task) int[source]

Retrieve mode found for given task.

Parameters:

task

Returns:

problem: SinglemodeProblem[Task]

discrete_optimization.generic_tasks_tools.multimode_scheduling module

class discrete_optimization.generic_tasks_tools.multimode_scheduling.MultimodeSchedulingProblem[source]

Bases: SchedulingProblem[Task], MultimodeProblem[Task], Generic[Task]

Scheduling problem whose tasks durations depend only on mode.

abstractmethod get_task_mode_duration(task: Task, mode: int) int[source]

Get task duration according to mode.

Parameters:
  • task

  • mode – not used for single-mode problems

Returns:

class discrete_optimization.generic_tasks_tools.multimode_scheduling.MultimodeSchedulingSolution(problem: Problem)[source]

Bases: SchedulingSolution[Task], MultimodeSolution[Task], Generic[Task]

Solution type associated to MultimodeSchedulingProblem.

check_duration_constraints() bool[source]
check_task_duration_constraint(task: Task) bool[source]
problem: MultimodeSchedulingProblem[Task]
class discrete_optimization.generic_tasks_tools.multimode_scheduling.SinglemodeSchedulingProblem[source]

Bases: SinglemodeProblem[Task], MultimodeSchedulingProblem[Task]

Single mode scheduling problems with fixed task durations.

Utility class simplifying MultimodeSchedulingProblem when single mode only.

abstractmethod get_task_duration(task: Task) int[source]

Get task duration according to mode.

Parameters:

task

Returns:

get_task_mode_duration(task: Task, mode: int) int[source]

Get task duration according to mode.

Parameters:
  • task

  • mode – not used for single-mode problems

Returns:

class discrete_optimization.generic_tasks_tools.multimode_scheduling.SinglemodeSchedulingSolution(problem: Problem)[source]

Bases: SinglemodeSolution[Task], MultimodeSchedulingSolution[Task]

Solution for single mode scheduling problem with fixed task durations.

Utility class useful when needing to derive from GenericSchedulingSolution without multi mode to be able to use cpsat auto solver.

discrete_optimization.generic_tasks_tools.no_overlap_scheduling module

class discrete_optimization.generic_tasks_tools.no_overlap_scheduling.NoOverlapProblem[source]

Bases: SchedulingProblem[Task]

Problem with no overlap between tasks: For example for open-shop problem, where the order of each operations is arbitrary, but still no overlap.

abstractmethod get_no_overlap() set[frozenset[Task]][source]

An object in this returned set is a (frozen) set of task, where no task should overlap with another one in this set

class discrete_optimization.generic_tasks_tools.no_overlap_scheduling.NoOverlapSolution(problem: Problem)[source]

Bases: SchedulingSolution[Task]

Solution for problem with precedence constraints.

check_no_overlap()[source]
problem: NoOverlapProblem[Task]
class discrete_optimization.generic_tasks_tools.no_overlap_scheduling.WithoutNoOverlapProblem[source]

Bases: NoOverlapProblem[Task]

Utility mixin for problem w/o precedence constraints.

To be used has an additional mixin with generic GenericSchedulingProblem.

get_no_overlap() set[frozenset[Task]][source]

An object in this returned set is a (frozen) set of task, where no task should overlap with another one in this set

class discrete_optimization.generic_tasks_tools.no_overlap_scheduling.WithoutNoOverlapSolution(problem: Problem)[source]

Bases: NoOverlapSolution[Task]

check_no_overlap() bool[source]

discrete_optimization.generic_tasks_tools.non_renewable_resource module

class discrete_optimization.generic_tasks_tools.non_renewable_resource.NonRenewableResourceProblem[source]

Bases: MultimodeProblem[Task], Generic[Task, NonRenewableResource]

Base class for problems dealing with non-renewable resources consumed by tasks.

The task consumption of these non-renewable resources is supposed to be determined entirely determined by the task mode.

abstractmethod get_non_renewable_resource_capacity(resource: NonRenewableResource) int[source]

Get resource max capacity

Parameters:

resource

Returns:

abstractmethod get_non_renewable_resource_consumption(resource: NonRenewableResource, task: Task, mode: int) int[source]

Get resource consumption of the task in the given mode

Parameters:
  • resource – non-renewable resource

  • task

  • mode – not used for single mode problems

Returns:.

Raises:

ValueError – if resource consumption is depending on other variables than mode

abstract property non_renewable_resources_list: list[NonRenewableResource]

Non-renewable resources used by the tasks.

class discrete_optimization.generic_tasks_tools.non_renewable_resource.NonRenewableResourceSolution(problem: Problem)[source]

Bases: MultimodeSolution[Task], Generic[Task, NonRenewableResource]

check_all_non_renewable_resource_capacity_constraints() bool[source]

Check capacity constraint on all renewable resources.

check_non_renewable_resource_capacity_constraint(resource: NonRenewableResource) bool[source]

Check capacity constraint on given renewable resource.

check_non_renewable_resource_capacity_constraints(resources: Iterable[NonRenewableResource])[source]
compute_aggregated_non_renewable_resources_consumptions(weights: dict[NonRenewableResource, int] | None = None)[source]

Compute aggregated consumption of each non-renewable resource by the solution.

Parameters:

weights – optional weights to apply to each resource in the sum. Default to 1.

compute_nb_non_renewable_resources_used(weights: dict[NonRenewableResource, int] | None = None) int[source]

Compute number of non-renewable resources used by at least one task.

Parameters:

weights – optional weights to apply to each resource in the sum. Default to 1.

Returns:

compute_non_renewable_resources_consumptions() dict[NonRenewableResource, int][source]

Compute total consumption of each non-renewable resource by the solution.

get_non_renewable_resource_consumption(resource: NonRenewableResource, task: Task) int[source]

Get resource consumption by given task.

Parameters:
  • resource

  • task

Returns:

problem: NonRenewableResourceProblem[Task, NonRenewableResource]
class discrete_optimization.generic_tasks_tools.non_renewable_resource.WithoutNonRenewableResourceProblem[source]

Bases: NonRenewableResourceProblem[Task, None], Generic[Task]

Mixin for problem without non-renewable resources.

To be used has an additional mixin with generic GenericSchedulingProblem.

get_non_renewable_resource_capacity(resource: NonRenewableResource) int[source]

Get resource max capacity

Parameters:

resource

Returns:

get_non_renewable_resource_consumption(resource: NonRenewableResource, task: Task, mode: int) int[source]

Get resource consumption of the task in the given mode

Parameters:
  • resource – non-renewable resource

  • task

  • mode – not used for single mode problems

Returns:.

Raises:

ValueError – if resource consumption is depending on other variables than mode

property non_renewable_resources_list: list[NonRenewableResource]

Non-renewable resources used by the tasks.

class discrete_optimization.generic_tasks_tools.non_renewable_resource.WithoutNonRenewableResourceSolution(problem: Problem)[source]

Bases: NonRenewableResourceSolution[Task, None], Generic[Task]

Mixin for solution without non-renewable resources.

To be used has an additional mixin with generic GenericSchedulingSolution.

check_all_non_renewable_resource_capacity_constraints() bool[source]

Check capacity constraint on all renewable resources.

check_non_renewable_resource_capacity_constraint(resource: NonRenewableResource) bool[source]

Check capacity constraint on given renewable resource.

check_non_renewable_resource_capacity_constraints(resources: Iterable[NonRenewableResource])[source]
compute_aggregated_non_renewable_resources_consumptions(weights: dict[NonRenewableResource, int] | None = None)[source]

Compute aggregated consumption of each non-renewable resource by the solution.

Parameters:

weights – optional weights to apply to each resource in the sum. Default to 1.

compute_nb_non_renewable_resources_used(weights: dict[NonRenewableResource, int] | None = None) int[source]

Compute number of non-renewable resources used by at least one task.

Parameters:

weights – optional weights to apply to each resource in the sum. Default to 1.

Returns:

compute_non_renewable_resources_consumptions() dict[NonRenewableResource, int][source]

Compute total consumption of each non-renewable resource by the solution.

discrete_optimization.generic_tasks_tools.precedence module

class discrete_optimization.generic_tasks_tools.precedence.PrecedenceProblem[source]

Bases: TasksProblem[Task]

Problem with precedence constraints on tasks.

get_consolidated_precedence_constraints() dict[Task, set[Task]][source]

Consolidate precedence constraints defined by problem.

In GenericSchedulingProblem it will also take into account time lags constraints. Default implementation is only taking get_precededence_constraints, removing the potential duplicates.

abstractmethod get_precedence_constraints() dict[Task, Iterable[Task]][source]

Map each task to the tasks that need to be performed after it.

get_precedence_graph() Graph[source]
update_precedence_constraints() None[source]

Method to call when precedence constraints have been updated.

To be overriden in child classes.

Returns:

class discrete_optimization.generic_tasks_tools.precedence.PrecedenceSolution(problem: Problem)[source]

Bases: TasksSolution[Task]

Solution for problem with precedence constraints.

check_precedence_constraints() bool[source]

Check that all precedence constraints are satisfied.

Returns:

abstractmethod check_tasks_order(task1, task2) bool[source]

Check whether task1 is performed before task2.

Parameters:
  • task1

  • task2

Returns:

True if task1 is finished before task2 starts, False else.

problem: PrecedenceProblem[Task]
class discrete_optimization.generic_tasks_tools.precedence.WithoutPrecedenceProblem[source]

Bases: PrecedenceProblem[Task]

Utility mixin for problem w/o precedence constraints.

To be used has an additional mixin with generic GenericSchedulingProblem.

get_precedence_constraints() dict[Task, Iterable[Task]][source]

Map each task to the tasks that need to be performed after it.

class discrete_optimization.generic_tasks_tools.precedence.WithoutPrecedenceSolution(problem: Problem)[source]

Bases: PrecedenceSolution[Task]

check_precedence_constraints() bool[source]

Check that all precedence constraints are satisfied.

Returns:

discrete_optimization.generic_tasks_tools.precedence_scheduling module

class discrete_optimization.generic_tasks_tools.precedence_scheduling.PrecedenceSchedulingProblem[source]

Bases: PrecedenceProblem[Task], SchedulingProblem[Task]

Scheduling problem with precedence constraints on tasks.

get_last_tasks() list[Task][source]

Get a sublist of tasks that are candidate to be the last one scheduled.

Default to all tasks.

class discrete_optimization.generic_tasks_tools.precedence_scheduling.PrecedenceSchedulingSolution(problem: Problem)[source]

Bases: PrecedenceSolution[Task], SchedulingSolution[Task]

Solution for scheduling problem with precedence constraints.

Can implement check_tasks_order by using start and end times.

check_tasks_order(task1, task2) bool[source]

Check whether task1 is performed before task2.

Parameters:
  • task1

  • task2

Returns:

True if task1 is finished before task2 starts, False else.

problem: PrecedenceSchedulingProblem[Task]

discrete_optimization.generic_tasks_tools.scheduling module

class discrete_optimization.generic_tasks_tools.scheduling.SchedulingCpSolver(problem: Problem, params_objective_function: ParamsObjectiveFunction | None = None, **kwargs: Any)[source]

Bases: TasksCpSolver[Task]

Base class for cp solvers handling scheduling problems.

abstractmethod add_constraint_chaining_tasks(task1: Task, task2: Task) list[Any][source]

Add constraint chaining task1 with task2

task2 start == task1 end

Parameters:
  • task1

  • task2

Returns:

resulting constraints

abstractmethod add_constraint_on_task(task: Task, start_or_end: StartOrEnd, sign: SignEnum, time: int) list[Any][source]

Add constraint on given task start or end

task start or end must compare to time according to sign

Parameters:
  • task

  • start_or_end

  • sign

  • time

Returns:

resulting constraints

get_global_makespan_variable() Any[source]

Construct and get the variable tracking the global makespan.

Default implementation uses get_subtasks_makespan_variable on last tasks. Beware: a further call to get_subtasks_makespan_variable with another subset of tasks can change the constraints on this variable and thus make it obsolete.

Returns:

objective variable to minimize

get_makespan_lower_bound() int[source]

Get a lower bound on global makespan.

Can be overriden in solvers wanting to specify it in init_model() for instance.

get_makespan_upper_bound() int[source]

Get a upper bound on global makespan.

abstractmethod get_subtasks_makespan_variable(subtasks: Iterable[Task]) Any[source]

Construct and get the variable tracking the makespan on a subset of tasks.

Beware: a further call to get_subtasks_makespan_variable with another subset of tasks can change the constraints on this variable and thus make it obsolete.

Parameters:

subtasks

Returns:

objective variable to minimize

abstractmethod get_subtasks_sum_end_time_variable(subtasks: Iterable[Task]) Any[source]

Construct and get the variable tracking the sum of end times on a subset of tasks.

Parameters:

subtasks

Returns:

objective variable to minimize

abstractmethod get_subtasks_sum_start_time_variable(subtasks: Iterable[Task]) Any[source]

Construct and get the variable tracking the sum of start times on a subset of tasks.

Parameters:

subtasks

Returns:

objective variable to minimize

problem: SchedulingProblem[Task]
class discrete_optimization.generic_tasks_tools.scheduling.SchedulingProblem[source]

Bases: TasksProblem[Task]

Base class for scheduling problems.

A scheduling problems is about finding start and end times to tasks.

get_last_tasks() list[Task][source]

Get a sublist of tasks that are candidate to be the last one scheduled.

Default to all tasks.

get_makespan_lower_bound() int[source]

Get a lower bound on global makespan.

Default to 0. But can be overriden for problems with more information.

abstractmethod get_makespan_upper_bound() int[source]

Get an upper bound on global makespan.

class discrete_optimization.generic_tasks_tools.scheduling.SchedulingSolution(problem: Problem)[source]

Bases: TasksSolution[Task]

Base class for solution to scheduling problems.

constraint_chaining_tasks_satisfied(task1: Task, task2: Task) bool[source]
constraint_on_task_satisfied(task: Task, start_or_end: StartOrEnd, sign: SignEnum, time: int) bool[source]
get_duration(task: Task) int[source]
abstractmethod get_end_time(task: Task) int[source]
get_max_end_time() int[source]
get_running_tasks(time: int) list[Task][source]

Extract tasks running at given time.

get_start_or_end_time(task: Task, start_or_end: StartOrEnd) int[source]

Get the start or end time for a given task.

abstractmethod get_start_time(task: Task) int[source]
problem: SchedulingProblem[Task]

discrete_optimization.generic_tasks_tools.skill module

Module containing mixins for skills.

A skill is a cumulative resource which is attached to a unary resource.

class discrete_optimization.generic_tasks_tools.skill.SkillProblem[source]

Bases: CumulativeResourceProblem[Task, Skill | NonSkillCumulativeResource, OtherCalendarResource], AllocationProblem[Task, UnaryResource], Generic[Task, UnaryResource, Skill, NonSkillCumulativeResource, OtherCalendarResource]

compute_skill_availabilities(skill: Skill) list[tuple[int, int, int]][source]

Deduce skill availabilities from unary_resource availabilities and skill values.

property cumulative_resources_list: list[Skill | NonSkillCumulativeResource]
get_skills_of_task(task: Task) set[Skill][source]
get_skills_of_unary_resource(unary_resource: UnaryResource) set[Skill][source]
abstractmethod get_unary_resource_skill_value(unary_resource: UnaryResource, skill: Skill) int[source]

Skill value of given resource for given skill.

get_unary_resource_with_skill(skill: Skill) set[UnaryResource][source]
abstract property non_skill_cumulative_resources_list: list[Skill]

List of cumulative resources that are not skills.

only_one_skill_per_task: bool = False

Only one skill from each unary resource allocated to a given task can be used for the task.

abstract property skills_list: list[Skill]

List of skills needed by tasks and brought by unary resources.

update_skills()[source]
class discrete_optimization.generic_tasks_tools.skill.SkillSolution(problem: Problem)[source]

Bases: CumulativeResourceSolution[Task, Skill | NonSkillCumulativeResource, OtherCalendarResource], AllocationSolution[Task, UnaryResource], Generic[Task, UnaryResource, Skill, NonSkillCumulativeResource, OtherCalendarResource]

check_only_one_skill_per_task_and_unary_resource()[source]
check_skill_constraint(task: Task, skill: Skill, exact: bool = False, slack: int = 0) bool[source]
check_skill_constraints(exact: bool = False, slack: int = 0) bool[source]
check_skill_usage_and_allocation_consistency() bool[source]
get_skill_value_on_task(task: Task, skill: Skill) int[source]
abstractmethod is_skill_used(task: Task, unary_resource: UnaryResource, skill: Skill) bool[source]

Tell whether the given skill from given unary_resource is used in given task.

If True, self.is_allocated(task, unary_resource) must also be True. If the skill is not needed by the task or not in unary_resource skills, should return False.

problem: SkillProblem[Task, UnaryResource, Skill, NonSkillCumulativeResource, OtherCalendarResource]
class discrete_optimization.generic_tasks_tools.skill.WithoutSkillProblem[source]

Bases: SkillProblem[Task, UnaryResource, None, NonSkillCumulativeResource, OtherCalendarResource], Generic[Task, UnaryResource, NonSkillCumulativeResource, OtherCalendarResource]

get_unary_resource_skill_value(unary_resource: UnaryResource, skill: Skill) int[source]

Skill value of given resource for given skill.

property skills_list: list[Skill]

List of skills needed by tasks and brought by unary resources.

class discrete_optimization.generic_tasks_tools.skill.WithoutSkillSolution(problem: Problem)[source]

Bases: SkillSolution[Task, UnaryResource, None, NonSkillCumulativeResource, OtherCalendarResource], Generic[Task, UnaryResource, NonSkillCumulativeResource, OtherCalendarResource]

check_skill_constraint(task: Task, skill: None, exact: bool = False, slack: int = 0) bool[source]
check_skill_constraints(exact: bool = False, slack: int = 0) bool[source]
check_skill_usage_and_allocation_consistency() bool[source]
is_skill_used(task: Task, unary_resource: UnaryResource, skill: Skill) bool[source]

Tell whether the given skill from given unary_resource is used in given task.

If True, self.is_allocated(task, unary_resource) must also be True. If the skill is not needed by the task or not in unary_resource skills, should return False.

discrete_optimization.generic_tasks_tools.timelag module

class discrete_optimization.generic_tasks_tools.timelag.TimelagProblem[source]

Bases: SchedulingProblem[Task], Generic[Task]

Class for problem having time lags between tasks.

get_consolidated_time_lags(task1_start_or_end: StartOrEnd, task2_start_or_end: StartOrEnd, min_or_max: MinOrMax) list[tuple[Task, Task, int]][source]

Get consolidated time lags.

The goal is to normalize the time lags list to avoid having duplicates or constraints implied by others. Note that it encompasses - same tuples in one of the lists - (t1, t2, offset) in end_to_start_min_time_lags and (t2, t1, -offset) in start_to_end_max_time_lags - several offsets for the same tasks (t1, t2)

The choices made here to normalize are: 1. keep only positive offsets in max time lags (the others are converted to min time lags) 2. keep only non-negative offsets in min time lags (the others are converted to max time lags) 3. take max(offsets) in min time lags when several offsets are available for the same tasks (t1, t2) 4. take min(offsets) in max time lags when several offsets are available for the same tasks (t1, t2) 5. drop (t1, t2, offset) in max time lag (with offset>0) if (t2, t1, offset’) with offset’>=0 is already in corresponding min time lag

as the latter implies trivially the other

Note that points 1, 4, and 5 amounts to taking min(offsets) on all tuples in max time lags + converted ones from min time lags (t2,t1 -offset), whichever the sign, and drop it if this min(offsets) is non-negative (whhich corresponds to the existence of (t2, t1, offset’) with offset’>=0 in min time lags.

Moreover this method makes the mapping between all methods according to start/end/min/max.

Parameters:
  • task1_start_or_end

  • task2_start_or_end

  • min_or_max

Returns:

get_end_to_end_max_time_lags() list[tuple[Task, Task, int]][source]

Get max time lags between task ends.

Default to no max time lags. Should be overriden in child class for problems with max time lags.

Returns:

list of task1, task2, offset meaning end(task1) + offset >= end(task2)

get_end_to_end_min_time_lags() list[tuple[Task, Task, int]][source]

Get min time lags between task ends.

Default to no min time lags. Should be overriden in child class for problems with min time lags.

Returns:

list of task1, task2, offset meaning end(task1) + offset <= end(task2)

get_end_to_start_max_time_lags() list[tuple[Task, Task, int]][source]

Get max time lags between first task end and second task start.

Default to no max time lags. Should be overriden in child class for problems with max time lags.

Returns:

list of task1, task2, offset meaning end(task1) + offset >= start(task2)

get_end_to_start_min_time_lags() list[tuple[Task, Task, int]][source]

Get min time lags between first task end and second task start.

Default to no min time lags. Should be overriden in child class for problems with min time lags.

Returns:

list of task1, task2, offset meaning end(task1) + offset <= start(task2)

get_original_time_lags(task1_start_or_end: StartOrEnd, task2_start_or_end: StartOrEnd, min_or_max: MinOrMax) list[tuple[Task, Task, int]][source]

Maps strt/end/min/max choices to corresponding list of timelags

Parameters:
  • task1_start_or_end

  • task2_start_or_end

  • min_or_max

Returns:

get_start_to_end_max_time_lags() list[tuple[Task, Task, int]][source]

Get max time lags between first task start and second task end.

Default to no max time lags. Should be overriden in child class for problems with max time lags.

Returns:

list of task1, task2, offset meaning start(task1) + offset >= end(task2)

get_start_to_end_min_time_lags() list[tuple[Task, Task, int]][source]

Get min time lags between first task start and second task end.

Default to no min time lags. Should be overriden in child class for problems with min time lags.

Returns:

list of task1, task2, offset meaning start(task1) + offset <= end(task2)

get_start_to_start_max_time_lags() list[tuple[Task, Task, int]][source]

Get max time lags between tasks starts.

Default to no max time lags. Should be overriden in child class for problems with max time lags.

Returns:

list of task1, task2, offset meaning start(task1) + offset >= start(task2)

get_start_to_start_min_time_lags() list[tuple[Task, Task, int]][source]

Get min time lags between tasks starts.

Default to no min time lags. Should be overriden in child class for problems with min time lags.

Returns:

list of task1, task2, offset meaning start(task1) + offset <= start(task2)

update_time_lags() None[source]

Method to call when time lags have been updated.

Clear cache from consolidated precedence constraints.

Returns:

class discrete_optimization.generic_tasks_tools.timelag.TimelagSolution(problem: Problem)[source]

Bases: SchedulingSolution[Task], Generic[Task]

Class for solution of problems having time lags between tasks.

check_time_lags() bool[source]

check whether time lags are respected.

problem: TimelagProblem[Task]
discrete_optimization.generic_tasks_tools.timelag.consolidate_max_time_lags(timelags: list[tuple[Task, Task, int]]) list[tuple[Task, Task, int]][source]

Get consolidated min time lags.

It merges min time lags targeting same tasks, taking the most restrictive offset.

Args:

timelags:

Returns:

discrete_optimization.generic_tasks_tools.timelag.consolidate_min_time_lags(timelags: list[tuple[Task, Task, int]]) list[tuple[Task, Task, int]][source]

Get consolidated min time lags.

It merges min time lags targeting same tasks, taking the most restrictive offset.

Args:

timelags:

Returns:

discrete_optimization.generic_tasks_tools.timewindow module

class discrete_optimization.generic_tasks_tools.timewindow.TimewindowProblem[source]

Bases: SchedulingProblem[Task], Generic[Task]

Class for problem having time windows between tasks.

get_makespan_lower_bound() int[source]

Get a lower bound on global makespan.

Time windows on last tasks can be used to get a better makespan lower bound.

get_task_start_or_end_lower_bound(task: Task, start_or_end: StartOrEnd) int[source]

Get a lower bound on start or end of a given task as specified by the problem.

For tighter computed bounds, see GenericSchedulingProblem.get_tight_task_start_or_end_lower_bound() and GenericSchedulingProblem.compute_task_bounds().

Default implementation: 0

Parameters:
  • task

  • start_or_end

Returns:

get_task_start_or_end_upper_bound(task: Task, start_or_end: StartOrEnd) int[source]

Get an upper bound on start or end of a given task as specified by the problem.

For tighter computed bounds, see GenericSchedulingProblem.get_tight_task_start_or_end_upper_bound() and GenericSchedulingProblem.compute_task_bounds().

Default implementation: makespan upper bound

Parameters:
  • task

  • start_or_end

Returns:

class discrete_optimization.generic_tasks_tools.timewindow.TimewindowSolution(problem: Problem)[source]

Bases: SchedulingSolution[Task], Generic[Task]

Class for solution of problems having time windows between tasks.

check_time_windows() bool[source]

check whether time windows are respected.

problem: TimewindowProblem[Task]

Module contents