# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
from typing import Any, Dict, List
import numpy as np
import ortools
from ortools.sat.python.cp_model import (
CpModel,
CpSolverSolutionCallback,
Domain,
LinearExpr,
LinearExprT,
)
from discrete_optimization.flex_scheduling.fsp_utils import (
create_resource_consumption_from_calendar,
)
from discrete_optimization.flex_scheduling.problem import (
FlexProblem,
GroupType,
ObjectivesEnum,
ResourceData,
ScheduleSolution,
ScheduleSolutionPreemptive,
TasksGroups,
)
from discrete_optimization.flex_scheduling.solvers.cpsat import CpSatFlexSolver
from discrete_optimization.generic_tasks_tools.allocation import UnaryResource
from discrete_optimization.generic_tasks_tools.base import Task
from discrete_optimization.generic_tasks_tools.skill import Skill
logger = logging.getLogger(__name__)
[docs]
def resource_consumption(flex_problem: FlexProblem, solution: ScheduleSolution):
# Build an array resource_consumptions[i, r]
max_nb_modes = max([len(t.modes) for t in flex_problem.tasks])
resource_consumptions = np.zeros(
(flex_problem.nb_tasks, flex_problem.nb_resources), dtype=int
)
for i in range(flex_problem.nb_tasks):
mode_i = solution.modes[i]
consumption_dict = flex_problem.tasks[i].modes[mode_i].resource_consumption
for r_idx, r_name in enumerate(flex_problem.resources):
resource_consumptions[i, r_idx] = consumption_dict.get(r_name.id, 0)
return resource_consumptions
[docs]
class CPSatFlexSPPreempt(CpSatFlexSolver):
[docs]
def get_skill_variable(
self, task: Task, unary_resource: UnaryResource, skill: Skill
) -> LinearExprT:
return 0
[docs]
def retrieve_solution(
self, cpsolvercb: CpSolverSolutionCallback
) -> ScheduleSolution:
logger.info(
f"cur obj value : {cpsolvercb.ObjectiveValue()}, bound={cpsolvercb.BestObjectiveBound()}"
)
logger.info("Sub-objectives :")
details_subobj = {}
if "obj_data" in self.variables:
for i in range(len(self.variables["obj_data"][0])):
logger.info(
f"{self.variables['obj_data'][2][i]} : {cpsolvercb.Value(self.variables['obj_data'][0][i])}"
)
details_subobj[self.variables["obj_data"][2][i]] = cpsolvercb.Value(
self.variables["obj_data"][0][i]
)
if "resource_capacity_variables" in self.variables:
logger.info("Resource capacity data : ")
logger.info(
f"{[cpsolvercb.Value(self.variables['resource_capacity_variables'][x]) for x in self.variables['resource_capacity_variables']]}"
)
if "tardiness" in self.variables:
logger.info("Tardiness data")
for group in self.variables["tardiness"]["groups"]:
logger.info(f"Group {group}")
logger.info(
f"tardiness : {cpsolvercb.Value(self.variables['tardiness']['groups'][group]['tardiness'])}"
)
logger.info(
f"earliness : {cpsolvercb.Value(self.variables['tardiness']['groups'][group]['earliness'])}"
)
schedule = np.zeros((self.problem.nb_tasks, 2))
allocation = np.zeros(self.problem.nb_tasks)
schedule_l = {}
for i in range(self.problem.nb_tasks):
schedule_l[i] = []
schedule[i, 0] = cpsolvercb.Value(self.variables["starts"][i])
schedule[i, 1] = cpsolvercb.Value(self.variables["ends"][i])
for j in range(len(self.variables["starts_preempt"][i])):
st = cpsolvercb.Value(self.variables["starts_preempt"][i][j])
end = cpsolvercb.Value(self.variables["ends_preempt"][i][j])
dur = cpsolvercb.Value(self.variables["durs_preempt"][i][j])
present = cpsolvercb.Value(self.variables["is_present_preempt"][i][j])
if present:
schedule_l[i].append((st, end))
for j in self.variables["modes_variable"][i]:
if cpsolvercb.Value(self.variables["modes_variable"][i][j]):
allocation[i] = j
sol = ScheduleSolutionPreemptive(
problem=self.problem,
schedule=[schedule_l[i] for i in range(self.problem.nb_tasks)],
modes=allocation,
)
sol._intern_obj = details_subobj
return sol
[docs]
def from_solution_to_hint(
self, solution: ScheduleSolutionPreemptive
) -> list[tuple[ortools.sat.python.cp_model.VariableT, int]]:
list_variables_value = []
for i in range(self.problem.nb_tasks):
for j in range(len(self.variables["starts_preempt"][i])):
if j + 1 <= len(solution.schedule[i]):
list_variables_value.append(
(
self.variables["starts_preempt"][i][j],
solution.schedule[i][j][0],
)
)
list_variables_value.append(
(
self.variables["ends_preempt"][i][j],
solution.schedule[i][j][1],
)
)
list_variables_value.append(
(
self.variables["durs_preempt"][i][j],
solution.schedule[i][j][1] - solution.schedule[i][j][0],
)
)
list_variables_value.append(
(self.variables["is_present_preempt"][i][j], 1)
)
else:
list_variables_value.append(
(
self.variables["starts_preempt"][i][j],
solution.schedule[i][-1][1],
)
)
list_variables_value.append(
(
self.variables["ends_preempt"][i][j],
solution.schedule[i][-1][1],
)
)
list_variables_value.append(
(self.variables["durs_preempt"][i][j], 0)
)
list_variables_value.append(
(self.variables["is_present_preempt"][i][j], 0)
)
# Main variables
if "starts" in self.variables:
for i in range(self.problem.nb_tasks):
list_variables_value.append(
(self.variables["starts"][i], solution.schedule[i][0][0])
)
list_variables_value.append(
(self.variables["ends"][i], solution.schedule[i][-1][1])
)
list_variables_value.append(
(
self.variables["durations"][i],
solution.schedule[i][-1][1] - solution.schedule[i][0][0],
)
)
# Modes variables
for i in range(self.problem.nb_tasks):
mode_chosen = solution.modes[i]
for mode in self.problem.tasks[i].modes:
if mode == mode_chosen:
if len(self.problem.tasks[i].modes) > 1:
list_variables_value.append(
(self.variables["modes_variable"][i][mode], 1)
)
list_variables_value.append(
(
self.variables["durations_executed"][i],
self.problem.tasks[i].modes[mode].duration,
)
)
else:
if len(self.problem.tasks[i].modes) > 1:
list_variables_value.append(
(self.variables["modes_variable"][i][mode], 0)
)
# Group variables
if "resource_capacity_variables" in self.variables:
for group in self.problem.tasks_group:
group_id = group.id
ft = group.first_task_if_any
lt = group.last_task_if_any
if ft is not None:
i = self.problem.task_id_to_index[ft]
st_grp = solution.schedule[i][0][0]
else:
st_grp = min(
[
solution.schedule[self.problem.task_id_to_index[id_task]][
0
][0]
for id_task in group.tasks_group
]
)
if lt is not None:
i = self.problem.task_id_to_index[lt]
end_grp = solution.schedule[i, 1]
else:
end_grp = max(
[
solution.schedule[self.problem.task_id_to_index[id_task]][
-1
][1]
for id_task in group.tasks_group
]
)
list_variables_value.append(
(self.variables["start_span_variables"][group_id], st_grp)
)
list_variables_value.append(
(self.variables["end_span_variables"][group_id], end_grp)
)
list_variables_value.append(
(
self.variables["duration_span_variables"][group_id],
end_grp - st_grp,
)
)
list_variables_value += self.from_solution_to_hint_non_released_delta(
solution=solution
)
# Resource
if "resource_capacity_variables" in self.variables:
for resource in self.problem.resources:
if resource.id in self.variables["resource_capacity_variables"]:
list_variables_value.append(
(
self.variables["resource_capacity_variables"][resource.id],
int(resource.max_capacity),
)
)
groups = [
g
for g in self.problem.tasks_group
if g.type_of_group == GroupType.SUBGROUP_TASK_FOR_OBJECTIVE
]
nb_groups = len(groups)
if "capacity_group_execution" in self.variables:
list_variables_value.append(
(self.variables["capacity_group_execution"], nb_groups)
) # could be reduced
if "makespan" in self.variables:
list_variables_value.append(
(
self.variables["makespan"],
max(
[
solution.schedule[i][-1][1]
for i in range(self.problem.nb_tasks)
]
),
)
)
if "tardiness" in self.variables:
list_variables_value += self.from_solution_to_hint_earliness(solution)
return list_variables_value
[docs]
def from_solution_to_hint_non_released_delta(
self, solution: ScheduleSolutionPreemptive
):
list_variables_value = []
if (
self.problem.constraints.successor_with_res_release_at_start_of_successor
is not None
):
data = self.problem.constraints.successor_with_res_release_at_start_of_successor
for t1, t2, d_res in data:
# This is the simple case where, the first element of the tuple is the task id, without any mode.
if t1 in self.problem.task_id_to_index:
i1 = self.problem.task_id_to_index[t1]
i2 = self.problem.task_id_to_index[t2]
list_variables_value.append(
(
self.variables["durations_non_release_1"][(i1, i2)],
solution.schedule[i2][0][0] - solution.schedule[i1][-1][1],
)
)
if (
self.problem.constraints.successor_with_res_release_at_start_of_successor_mode
is not None
):
durations_non_release = {}
data = self.problem.constraints.successor_with_res_release_at_start_of_successor_mode
for (t1, mode), t2, d_res in data:
i1 = self.problem.task_id_to_index[t1]
i2 = self.problem.task_id_to_index[t2]
assert mode in self.variables["is_present"][i1]
if (i1, mode, i2) not in durations_non_release:
if solution.modes[i1] == mode:
list_variables_value.append(
(
self.variables["durations_non_release_2"][
(i1, mode, i2)
],
solution.schedule[i2][0][0]
- solution.schedule[i1][-1][1],
)
)
else:
list_variables_value.append(
(
self.variables["durations_non_release_2"][
(i1, mode, i2)
],
0,
)
)
if (
self.problem.constraints.successor_generic_with_res_release_at_start_of_successor_generic
is not None
):
data = self.problem.constraints.successor_generic_with_res_release_at_start_of_successor_generic
for t1, t2, d_res in data:
tag = []
st_ = None
end_ = None
if t1.is_a_task:
i1 = self.problem.task_id_to_index[t1.task_id]
st_ = solution.schedule[i1][-1][1]
tag.append(("task", t1.task_id))
else:
group = t1.group_id
gr: TasksGroups = [
g for g in self.problem.tasks_group if g.id == group
][0]
st_ = max(
[
solution.schedule[self.problem.task_id_to_index[id_task]][
-1
][1]
for id_task in gr.tasks_group
]
)
tag.append(("group", group))
if t2.is_a_task:
i2 = self.problem.task_id_to_index[t2.task_id]
end_ = solution.schedule[i2][0][0]
tag.append(("task", t2.task_id))
else:
group = t2.group_id
gr: TasksGroups = [
g for g in self.problem.tasks_group if g.id == group
][0]
end_ = min(
[
solution.schedule[self.problem.task_id_to_index[id_task]][
0
][0]
for id_task in gr.tasks_group
]
)
tag.append(("group", group))
tag = tuple(tag)
if "durations_non_release_3" in self.variables:
if self.variables["durations_non_release_3"][tag] not in [
x[0] for x in list_variables_value
]:
list_variables_value.append(
(self.variables["durations_non_release_3"][tag], end_ - st_)
)
return list_variables_value
[docs]
def from_solution_to_hint_earliness(self, solution: ScheduleSolutionPreemptive):
list_variables_value = []
for id_task in self.variables["tardiness"]["tasks"]:
index = self.problem.task_id_to_index[id_task]
deadline = self.problem.task_id_dict[id_task].max_ending_date
end = solution.schedule[index][-1][1]
if end > deadline:
list_variables_value.append(
(
self.variables["tardiness"]["tasks"][id_task]["tardiness"],
end - deadline,
)
)
if "earliness" in self.variables["tardiness"]["tasks"][id_task]:
list_variables_value.append(
(self.variables["tardiness"]["tasks"][id_task]["earliness"], 0)
)
else:
list_variables_value.append(
(self.variables["tardiness"]["tasks"][id_task]["tardiness"], 0)
)
if "earliness" in self.variables["tardiness"]["tasks"][id_task]:
list_variables_value.append(
(
self.variables["tardiness"]["tasks"][id_task]["earliness"],
deadline - end,
)
)
for id_group in self.variables["tardiness"]["groups"]:
group = self.problem.tasks_group[self.problem.group_id_to_index[id_group]]
deadline = group.max_ending_date
end = max(
[
solution.schedule[self.problem.task_id_to_index[id_task]][-1][1]
for id_task in group.tasks_group
]
)
if end > deadline:
list_variables_value.append(
(
self.variables["tardiness"]["groups"][id_group]["tardiness"],
end - deadline,
)
)
list_variables_value.append(
(self.variables["tardiness"]["groups"][id_group]["earliness"], 0)
)
else:
list_variables_value.append(
(self.variables["tardiness"]["groups"][id_group]["tardiness"], 0)
)
list_variables_value.append(
(
self.variables["tardiness"]["groups"][id_group]["earliness"],
deadline - end,
)
)
return list_variables_value
[docs]
def init_model(self, **args: Any) -> None:
model = CpModel()
self.cp_model = model
self.init_main_variables_preempt(
nb_max_preemption=45
) # main interval variables
self.init_span_task_variables()
self.init_modes_variables()
# Create some ghost intervals
self.init_variable_preempt_mode()
# Constraint preemption variables
self.constraint_convention_preemption()
self.constraint_sum_duration()
self.init_group_variables() # create span interval variable for relevant group of tasks.
# for cases where some resource are not released at the end of a task, but rather on the starting of one another
self.init_intervals_of_non_released_resource()
self.init_resource_variables() #
self.constraint_precedence()
self.constraint_precedence_on_groups()
self.constraint_cumulative()
self.constraint_on_groups_of_task()
self.constraint_generalized_time_constraint()
self.create_objectives()
[docs]
def create_objectives(self):
super().create_objectives()
init = self.variables["obj_data"]
nb_preemption = sum(
[
self.variables["is_present_preempt"][i][j]
for i in self.variables["is_present_preempt"]
for j in range(len(self.variables["is_present_preempt"][i]))
]
)
objs = init[0] + [nb_preemption]
weights = init[1] + [10000]
names = init[2] + ["nb_preemption"]
self.variables["obj_data"] = (objs, weights, names)
self.cp_model.Minimize(sum([objs[i] * weights[i] for i in range(len(objs))]))
[docs]
def compute_possible_start_end_duration_values(self, index_task: int):
conso = self.problem.tasks[index_task].modes[1]
[docs]
def init_main_variables_preempt(self, nb_max_preemption: int = None):
starts_variable = {}
ends_variable = {}
durations_variable = {}
is_present_variable = {}
intervals_variable = {}
for i in range(self.problem.nb_tasks):
max_duration = max(
[
self.problem.tasks[i].modes[m].duration
for m in self.problem.tasks[i].modes
]
)
res = [
r
for m in self.problem.tasks[i].modes
for r in self.problem.tasks[i].modes[m].resource_consumption
if self.problem.tasks[i].modes[m].resource_consumption[r] > 0
and len(set(self.problem.resource_dict[r].calendar_availability > 0))
>= 2
]
nb_preemption = max(1, max_duration)
if len(res) == 0 and False:
# print('No preemption')
nb_preemption = 1
else:
pass
# print('Preemption')
if nb_max_preemption is not None:
nb_preemption = min(nb_preemption, nb_max_preemption)
starts_variable[i] = [
self.cp_model.NewIntVar(
lb=0, ub=self.problem.horizon, name=f"start_{i}_{j}"
)
for j in range(nb_preemption)
]
ends_variable[i] = [
self.cp_model.NewIntVar(
lb=0, ub=self.problem.horizon, name=f"end_{i}_{j}"
)
for j in range(nb_preemption)
]
durations_variable[i] = [
self.cp_model.NewIntVar(lb=0, ub=max_duration, name=f"duration_{i}_{j}")
for j in range(nb_preemption)
]
is_present_variable[i] = [
self.cp_model.NewBoolVar(name=f"is_present_{i}_{j}")
for j in range(nb_preemption)
]
intervals_variable[i] = [
self.cp_model.NewOptionalIntervalVar(
start=starts_variable[i][j],
end=ends_variable[i][j],
size=durations_variable[i][j],
is_present=is_present_variable[i][j],
name=f"interval_{i}_{j}",
)
for j in range(nb_preemption)
]
self.variables["starts_preempt"] = starts_variable
self.variables["ends_preempt"] = ends_variable
self.variables["durs_preempt"] = durations_variable
self.variables["is_present_preempt"] = is_present_variable
self.variables["interval_preempt"] = intervals_variable
[docs]
def init_variable_preempt_mode(self):
opt_variables = {}
for i in range(self.problem.nb_tasks):
opt_variables[i] = {}
for mode in self.variables["modes_variable"][i]:
# pre = [self.cp_model.NewBoolVar(name=f"present_{j}_{mode}")
# for j in range(len(self.variables["starts_preempt"][i]))]
# for j in range(len(pre)):
# self.cp_model.AddBoolAnd([self.variables["modes_variable"][i][mode],
# self.variables["is_present_preempt"][i][j]],
# pre[j])
# self.cp_model.add(pre[j] == 1).OnlyEnforceIf(self.variables["modes_variable"][i][mode],
# self.variables["is_present_preempt"][i][j])
opt_variables[i][mode] = [
self.cp_model.NewOptionalIntervalVar(
start=self.variables["starts_preempt"][i][j],
end=self.variables["ends_preempt"][i][j],
size=self.variables["durs_preempt"][i][j],
is_present=self.variables["modes_variable"][i][mode],
# pre[j],
name=f"int_{i}_{j}_{mode}",
)
for j in range(len(self.variables["starts_preempt"][i]))
]
self.variables["interval_preempt_mode"] = opt_variables
[docs]
def init_span_task_variables(self):
span_interval_variables = {}
start_span_variables = {}
duration_span_variables = {}
end_span_variables = {}
for i in range(self.problem.nb_tasks):
start_span_variables[i] = self.variables["starts_preempt"][i][0]
end_span_variables[i] = self.variables["ends_preempt"][i][-1]
duration_span_variables[i] = self.cp_model.NewIntVar(
# lb=0, ub=self.max_end_time[i]-self.min_end_time[i],
lb=0,
ub=self.problem.horizon,
name=f"duration_span_{i}",
)
self.cp_model.Add(
end_span_variables[i]
== start_span_variables[i] + duration_span_variables[i]
)
span_interval_variables[i] = self.cp_model.NewIntervalVar(
start=start_span_variables[i],
size=duration_span_variables[i],
end=end_span_variables[i],
name=f"span_{i}",
)
self.variables["starts"] = start_span_variables
self.variables["ends"] = end_span_variables
self.variables["durations"] = duration_span_variables
self.variables["intervals"] = span_interval_variables
[docs]
def constraint_convention_preemption(self):
for i in self.variables["starts_preempt"]:
min_duration = min(
[
self.problem.tasks[i].modes[m].duration
for m in self.problem.tasks[i].modes
]
)
starts = self.variables["starts_preempt"][i]
ends = self.variables["ends_preempt"][i]
durs = self.variables["durs_preempt"][i]
presents = self.variables["is_present_preempt"][i]
for j in range(1, len(starts) - 1):
self.cp_model.AddImplication(presents[j].Not(), presents[j + 1].Not())
for j in range(1, len(starts)):
self.cp_model.Add(starts[j] >= ends[j - 1])
self.cp_model.Add(starts[j] > ends[j - 1]).OnlyEnforceIf(
presents[j]
) # disjoint..
# self.cp_model.Add(starts[j] < ends[j-1]+5).OnlyEnforceIf(presents[j]) # disjoint..
# if min_duration >= 2:
# self.cp_model.Add(durs[j] >= 2).OnlyEnforceIf(presents[j]) # disjoint..
self.cp_model.Add(presents[j] == 0).OnlyEnforceIf(presents[j - 1].Not())
self.cp_model.Add(starts[j] == ends[j - 1]).OnlyEnforceIf(
presents[j].Not()
)
self.cp_model.Add(ends[j] == ends[j - 1]).OnlyEnforceIf(
presents[j].Not()
)
self.cp_model.Add(presents[0] == 1)
if min_duration > 0:
# First subtask is not dummy
self.cp_model.Add(durs[0] > 0)
self.cp_model.Add(presents[0] == 1)
for j in range(len(starts)):
self.cp_model.Add(durs[j] > 0).OnlyEnforceIf(presents[j])
self.cp_model.Add(durs[j] == 0).OnlyEnforceIf(presents[j].Not())
self.cp_model.AddNoOverlap(self.variables["interval_preempt"][i])
[docs]
def constraint_sum_duration(self):
for i in range(self.problem.nb_tasks):
durs = self.variables["durs_preempt"][i]
self.cp_model.Add(
LinearExpr.sum(durs) == self.variables["durations_executed"][i]
)
[docs]
def init_modes_variables(self):
modes_variable = {}
duration_variable = {}
for i in range(self.problem.nb_tasks):
nb_modes = len(self.problem.tasks[i].modes)
modes_variable[i] = {}
possible_durs = list(
set(
[
self.problem.tasks[i].modes[mode].duration
for mode in self.problem.tasks[i].modes
]
)
)
if len(possible_durs) == 1:
duration_variable[i] = possible_durs[0]
else:
duration_variable[i] = self.cp_model.NewIntVarFromDomain(
Domain.from_values(possible_durs), name=f"duration_execution_{i}"
)
for mode in self.problem.tasks[i].modes:
if nb_modes > 1:
modes_variable[i][mode] = self.cp_model.NewBoolVar(
name=f"task_{i}_m_{mode}"
)
(
self.cp_model.Add(
duration_variable[i]
== self.problem.tasks[i].modes[mode].duration
).OnlyEnforceIf(modes_variable[i][mode])
)
else:
modes_variable[i][mode] = True
if nb_modes > 1:
self.cp_model.AddExactlyOne(
[modes_variable[i][mode] for mode in modes_variable[i]]
)
self.variables["modes_variable"] = modes_variable
self.variables["durations_executed"] = duration_variable
[docs]
def init_main_variables(self):
starts_variable = {}
ends_variable = {}
durations_variable = {}
intervals_variable = {}
for i in range(self.problem.nb_tasks):
starts_variable[i] = self.cp_model.NewIntVar(
lb=self.min_start_time[i], ub=self.max_start_time[i], name=f"start_{i}"
)
ends_variable[i] = self.cp_model.NewIntVar(
lb=self.min_end_time[i],
ub=self.max_end_time[i]
if not self.problem.tasks[i].soft_max_end_date
else self.problem.horizon,
# deadline is actually soft
name=f"end_{i}",
)
max_duration = max(
[
self.problem.tasks[i].modes[m].duration
for m in self.problem.tasks[i].modes
]
)
min_duration = min(
[
self.problem.tasks[i].modes[m].duration
for m in self.problem.tasks[i].modes
]
)
durations_variable[i] = self.cp_model.NewIntVar(
lb=min_duration, ub=100 * max_duration, name=f"duration_{i}"
)
intervals_variable[i] = self.cp_model.NewIntervalVar(
start=starts_variable[i],
size=durations_variable[i],
end=ends_variable[i],
name=f"interval_{i}",
)
self.variables["starts"] = starts_variable
self.variables["ends"] = ends_variable
self.variables["durations"] = durations_variable
self.variables["intervals"] = intervals_variable
[docs]
def constraint_cumulative(self):
for r in self.problem.resources:
if r.renewable:
self.constraint_cumulative_resource(
resource=r, variable_max_capacity=False
)
if (
ObjectivesEnum.RESOURCE_COST
in self.problem.objective_params.params_obj
):
if (
r.id
in self.problem.objective_params.params_obj[
ObjectivesEnum.RESOURCE_COST
].weight_per_resource_unit
):
self.constraint_cumulative_resource(
resource=r, variable_max_capacity=True
)
[docs]
def constraint_cumulative_resource(
self, resource: ResourceData, variable_max_capacity: bool = False
):
res_comp: List[Dict[str, int]] = create_resource_consumption_from_calendar(
calendar_availability=resource.calendar_availability
)
id_resource = resource.id
task_mode_consume = [
(
self.variables["interval_preempt_mode"][i][mode][j],
int(self.problem.tasks[i].modes[mode].get_res_consumption(id_resource)),
)
for i in self.variables["interval_preempt_mode"]
for mode in self.variables["interval_preempt_mode"][i]
for j in range(len(self.variables["interval_preempt_mode"][i][mode]))
if self.problem.tasks[i].modes[mode].get_res_consumption(id_resource) > 0
]
task_mode_consume_with_non_release = None
if "intervals_non_release" in self.variables:
if resource.id in self.variables["intervals_non_release"]:
task_mode_consume_with_non_release = (
task_mode_consume
+ self.variables["intervals_non_release"][resource.id]
)
fake_task_res = [
(
self.cp_model.NewFixedSizeIntervalVar(
start=f["start"], size=f["duration"], name=f"res_"
),
f["value"],
)
for f in res_comp
if f["value"] > 0 # and f["value"] != resource.max_capacity
]
fake_task_res_bis = [
(
self.cp_model.NewFixedSizeIntervalVar(
start=f["start"], size=f["duration"], name=f"res_"
),
f["value"],
)
for f in res_comp
if f["value"] > 0 and f["value"] != resource.max_capacity
]
if not variable_max_capacity:
if task_mode_consume_with_non_release:
self.cp_model.AddCumulative(
[x[0] for x in task_mode_consume_with_non_release]
+ [x[0] for x in fake_task_res_bis],
[x[1] for x in task_mode_consume_with_non_release]
+ [x[0] for x in fake_task_res_bis],
resource.max_capacity,
)
self.cp_model.AddCumulative(
[x[0] for x in task_mode_consume] + [x[0] for x in fake_task_res],
[x[1] for x in task_mode_consume] + [x[1] for x in fake_task_res],
resource.max_capacity,
)
else:
if task_mode_consume_with_non_release:
self.cp_model.AddCumulative(
[x[0] for x in task_mode_consume_with_non_release]
+ [x[0] for x in fake_task_res_bis],
[x[1] for x in task_mode_consume_with_non_release]
+ [x[0] for x in fake_task_res_bis],
self.variables["resource_capacity_variables"][resource.id],
)
self.cp_model.AddCumulative(
[
x[0] for x in task_mode_consume
], # + [x[0] for x in fake_task_res_bis],
[
x[1] for x in task_mode_consume
], # + [x[1] for x in fake_task_res_bis],
self.variables["resource_capacity_variables"][resource.id],
)
[docs]
def constraint_group_non_release_resource(self):
groups_non_release_resource = [
g
for g in self.problem.tasks_group
if g.type_of_group == GroupType.GROUP_TASK_NON_RELEASED_RESOURCE
]
all_resource_concerned = set()
for g in groups_non_release_resource:
all_resource_concerned.update(set([r for r in g.res_not_released]))
for resource in all_resource_concerned:
intervals_and_consumption = []
tasks_covered_in_group = set()
for g in groups_non_release_resource:
if resource in g.res_not_released and g.res_not_released[resource] > 0:
intervals_and_consumption.append(
(
self.variables["span_interval_variables"][g.id],
g.res_not_released[resource],
)
)
tasks_covered_in_group.update(g.tasks_group)
task_mode_consume = [
(
self.variables["interval_preempt_mode"][i][mode][j],
int(
self.problem.tasks[i].modes[mode].get_res_consumption(resource)
),
)
for i in self.variables["interval_preempt_mode"]
for mode in self.variables["interval_preempt_mode"][i]
for j in range(len(self.variables["interval_preempt_mode"][i][mode]))
if self.problem.tasks[i].modes[mode].get_res_consumption(resource) > 0
and self.problem.index_to_task_id[i] not in tasks_covered_in_group
]
if "resource_capacity_variables" in self.variables and False:
self.cp_model.AddCumulative(
[x[0] for x in task_mode_consume]
+ [x[0] for x in intervals_and_consumption],
[x[1] for x in task_mode_consume]
+ [x[1] for x in intervals_and_consumption],
self.variables["resource_capacity_variables"][resource],
)
self.cp_model.AddCumulative(
[x[0] for x in task_mode_consume]
+ [x[0] for x in intervals_and_consumption],
[x[1] for x in task_mode_consume]
+ [x[1] for x in intervals_and_consumption],
self.problem.resource_dict[resource].max_capacity,
)