# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
import logging
from abc import abstractmethod
from collections.abc import Hashable, Iterable
from functools import cache
from typing import Generic, Optional, TypeVar, Union
import numpy as np
from discrete_optimization.generic_tasks_tools.base import Task
from discrete_optimization.generic_tasks_tools.scheduling import (
SchedulingProblem,
SchedulingSolution,
)
logger = logging.getLogger(__name__)
Resource = TypeVar("Resource", bound=Hashable)
[docs]
class RenewableResourceProblem(SchedulingProblem[Task], Generic[Task, Resource]):
"""Base class for scheduling problems dealing with renewable resources whose availability depend on a calendar."""
@property
@abstractmethod
def renewable_resources_list(self) -> list[Resource]:
"""Renewable resources used by the tasks.
Notes:
- renewable = the resource replenishes as soon as a task using it ends;
- it can be a mix of unary resources (e.g. employees) and cumulative resources (e.g. tool types).
"""
...
[docs]
@abstractmethod
def get_resource_availabilities(
self, resource: Resource
) -> list[tuple[int, int, int]]:
"""Get availabilities intervals for a given resource
List of availability intervals of a resource.
If the resource is not available, potentially no interval returned.
It is assumed that the intervals are disjunct though.
Args:
resource:
Returns:
list of intervals of the form (start, end, value), which means from time `start` to time `end`,
there are `value` of the resource available.
NB: the start is included, the end is excluded (start <= t < end)
"""
...
[docs]
@cache
def get_resource_consolidated_availabilities(
self, resource: Resource, horizon: Optional[int] = None
) -> list[tuple[int, int, int]]:
"""Get availabilities intervals for a given resource, consolidated as a partition of [0, horizon)
Default implementation use the `get_calendar_resource_availabilities`, by assuming the intervals are disjunct
but potentially lacking 0-valued intervals.
Args:
resource:
horizon: max value of time considered. Default to `self.get_makespan_upper_bound()`.
Returns:
sorted list of intervals of the form (start, end, value), which means from time `start` to time `end`,
there are `value` of the resource available
NB: the start is included, the end is excluded (start <= t < end)
"""
if horizon is None:
horizon = self.get_makespan_upper_bound()
return consolidate_availability_intervals(
intervals=self.get_resource_availabilities(resource=resource),
horizon=horizon,
)
[docs]
@cache
def get_resource_calendar(
self, resource: Resource, horizon: Optional[int] = None
) -> list[int]:
"""Compute resource calendar.
Args:
resource:
horizon: max value of time considered. Default to `self.get_makespan_upper_bound()`.
Returns:
list of resource value by time step from 0 to horizon-1.
"""
if horizon is None:
horizon = self.get_makespan_upper_bound()
return convert_availability_intervals_to_calendar(
intervals=self.get_resource_availabilities(resource=resource),
horizon=horizon,
)
[docs]
@cache
def get_resource_max_capacity(self, resource: Resource) -> int:
"""Get max capacity of the given resource
Default implementation take the max over its calendar and cache it.
Args:
resource:
Returns:
"""
return max(
value
for start, end, value in self.get_resource_availabilities(resource=resource)
)
[docs]
def update_resource_availabilities(self) -> None:
"""Method to call when the resource availabilities have changed.
Default implementation clears the cache on `get_resource_max_capacity()`.
"""
self.get_resource_max_capacity.cache_clear()
self.get_resource_calendar.cache_clear()
self.get_resource_consolidated_availabilities.cache_clear()
[docs]
def get_fake_tasks(self, resource: Resource) -> list[tuple[int, int, int]]:
"""Get fake tasks explaining the delta between resource current capacity and its max capacity
Args:
resource:
Returns:
list of intervals of the form (start, end, value),
for task starting at `start` and ending at `end`, using `value` resource.
"""
max_capacity = self.get_resource_max_capacity(resource=resource)
return [
(start, end, consumption)
for start, end, value in self.get_resource_consolidated_availabilities(
resource=resource
)
if (consumption := max_capacity - value) > 0
]
[docs]
class RenewableResourceSolution(SchedulingSolution[Task], Generic[Task, Resource]):
problem: RenewableResourceProblem[Task, Resource]
[docs]
@abstractmethod
def get_resource_consumption(self, resource: Resource, task: Task) -> int:
"""Get resource consumption by given task.
Args:
resource:
task:
Returns:
"""
...
[docs]
def check_resource_capacity_constraint(self, resource: Resource) -> bool:
"""Check capacity constraint on given renewable resource."""
return self._check_resources_capacity_constraint_np(resources=(resource,))
def _check_resource_capacity_constraint_naive(self, resource: Resource) -> bool:
makespan = self.get_max_end_time()
return all(
sum(
self.get_resource_consumption(resource=resource, task=task)
for task in self.get_running_tasks(time=t)
)
<= value
for t, value in enumerate(
self.problem.get_resource_calendar(resource=resource)
)
if t < makespan
)
def _check_resources_capacity_constraint_np(
self, resources: Iterable[Resource]
) -> bool:
makespan = self.get_max_end_time()
resources_consumption = {
resource: np.zeros(makespan, dtype=int) for resource in resources
}
for task in self.problem.tasks_list:
start = self.get_start_time(task)
end = self.get_end_time(task)
for resource in resources:
resources_consumption[resource][start:end] += (
self.get_resource_consumption(resource=resource, task=task)
)
resources_capa_violation = {
resource: conso
> np.array(
self.problem.get_resource_calendar(resource=resource, horizon=makespan)
)
for resource, conso in resources_consumption.items()
}
if any(
resource_cap_violation.any()
for resource_cap_violation in resources_capa_violation.values()
):
logger.debug("Violations on renewable resource capacities:")
violations = {
resource: violation_timesteps
for resource, resource_cap_violation in resources_capa_violation.items()
if len((violation_timesteps := resource_cap_violation.nonzero()[0])) > 0
}
for resource, violation_timesteps in violations.items():
logger.debug(
f"resource '{resource}': at time {', '.join(str(t) for t in violation_timesteps)}"
)
return False
else:
return True
[docs]
def check_resources_capacity_constraints(
self, resources: Iterable[Resource]
) -> bool:
"""Check capacity constraint respected on given resources.
Do it simultaneously on all resources to optimize computation time.
"""
return self._check_resources_capacity_constraint_np(resources=resources)
[docs]
def check_all_resource_capacity_constraints(self) -> bool:
"""Check capacity constraint on all renewable resources."""
return self._check_resources_capacity_constraint_np(
resources=self.problem.renewable_resources_list
)
[docs]
def convert_calendar_to_availability_intervals(
calendar: Union[int, list[int]], horizon: int
) -> list[tuple[int, int, int]]:
"""Convert a calendar into availability intervals.
Args:
calendar: if integer means a constant value, else list of values for each time step.
If len(calendar)<horizon, last values are assumed to be 0.
If len(calendar)>horizon, it is truncated to calendar[:horizon]
horizon: maximum time step considered
Returns:
list of (start,end, value), a sorted partition of [0, horizon)
"""
if np.isscalar(calendar): # constant resource
return [(0, horizon, int(calendar))]
else: # varying resource
intervals = []
t = 0
start = t
value = calendar[t]
end_calendar = min(len(calendar), horizon)
for t in range(1, end_calendar):
if calendar[t] != value:
# ends current interval, starts the next one
intervals.append((start, t, value))
start = t
value = calendar[t]
intervals.append((start, end_calendar, value))
return intervals
[docs]
def consolidate_availability_intervals(
intervals: list[tuple[int, int, int]], horizon: int
):
"""Ensure that the intervals are a partition of [0, horizon).
Args:
intervals: intervals (start, end, value). Supposed to be disjunct.
horizon: max value of time considered
Returns:
sorted list of intervals constituting a partition of [0, horizon)
"""
# truncate to horizon
intervals = [
(start, new_end, value)
for start, end, value in intervals
if start < (new_end := min(end, horizon)) # remove empty intervals
]
# sort intervals
intervals = sorted(
intervals,
key=lambda interval: interval[0],
)
# look for gaps between intervals => 0 resource
ends = [0] + [end for start, end, value in intervals]
next_starts = [start for start, end, value in intervals] + [horizon]
missing_intervals = []
for end, next_start in zip(ends, next_starts):
if end < next_start:
missing_intervals.append((end, next_start, 0))
elif end > next_start:
raise ValueError("Availability intervals intersecting.")
if len(missing_intervals) > 0:
intervals = sorted(
intervals + missing_intervals, key=lambda interval: interval[0]
)
return intervals
[docs]
def convert_availability_intervals_to_calendar(
intervals: list[tuple[int, int, int]], horizon: int
) -> list[int]:
"""Convert availability intervals into a calendar.
Args:
intervals: availability intervals, assumed to be disjunct
horizon: maximum time step considered
Returns:
list of available resource values for each time step
"""
return [
value
for start, end, value in consolidate_availability_intervals(intervals, horizon)
for _ in range(end - start)
]
[docs]
def merge_resources_availability_intervals(
intervals_per_resource: list[list[tuple[int, int, int]]], horizon: int
) -> list[tuple[int, int, int]]:
"""Merge several resources availability intervals, considering all resources as one meta-resource
Args:
intervals_per_resource: availability for each resource
horizon: maximum time step considered
Returns:
availability intervals for the meta-resource
"""
return convert_calendar_to_availability_intervals(
merge_resources_calendars(
[
convert_availability_intervals_to_calendar(intervals, horizon=horizon)
for intervals in intervals_per_resource
],
horizon=horizon,
),
horizon=horizon,
)
[docs]
def merge_resources_calendars(calendars: list[list[int]], horizon: int) -> list[int]:
"""Merge several resources calendars, considering all resources as one meta-resource
Args:
calendars: calendars for each resource to merge
horizon: maximum time step considered
Returns:
calendar for the meta-resource
"""
if len(calendars) == 0:
return [0] * horizon
else:
return [
sum(values)
for values in zip(
*(
consolidate_calendar(calendar, horizon=horizon)
for calendar in calendars
)
)
]
[docs]
def consolidate_calendar(calendar: list[int], horizon: int):
"""Consoidate the calendar to correspond to the given horizon
If too long, retursn calendar[:horizon].
If too short, fill with 0's.
Args:
calendar:
horizon:
Returns:
a calendar of size `horizon`
"""
return calendar[:horizon] + [0] * (horizon - len(calendar))