# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import dataclasses
import logging
from enum import Enum
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
import numpy as np
import ortools.sat.python.cp_model
from ortools.sat.python.cp_model import (
CpSolverSolutionCallback,
Domain,
IntervalVar,
LinearExpr,
LinearExprT,
)
from discrete_optimization.flex_scheduling.fsp_utils import (
create_resource_consumption_from_calendar,
get_lb_ub_start_end_date,
get_lb_ub_start_end_date_group_of_task,
)
from discrete_optimization.flex_scheduling.problem import (
RESOURCE_KEY,
CumulativeResource,
FlexProblem,
GroupType,
NonRenewableResource,
NoUnaryResource,
ObjectiveParamEarliness,
ObjectiveParamResource,
ObjectiveParamTardiness,
ObjectivesEnum,
ResourceData,
ScheduleSolution,
Task,
TaskData,
TasksGroups,
)
from discrete_optimization.generic_tasks_tools.allocation import UnaryResource
from discrete_optimization.generic_tasks_tools.enums import StartOrEnd
from discrete_optimization.generic_tasks_tools.solvers.cpsat.cumulative_resource import (
CumulativeResourceSchedulingCpSatSolver,
)
from discrete_optimization.generic_tasks_tools.solvers.cpsat.non_renewable_resource import (
NonRenewableCpSatSolver,
)
from discrete_optimization.generic_tasks_tools.solvers.cpsat.precedence_scheduling import (
PrecedenceSchedulingCpSatSolver,
)
from discrete_optimization.generic_tools.do_problem import Solution
from discrete_optimization.generic_tools.do_solver import WarmstartMixin
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
EnumHyperparameter,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
)
logger = logging.getLogger(__name__)
[docs]
@dataclasses.dataclass
class ConstraintIncluding:
include_non_released_resource: bool = dataclasses.field(default=True)
include_group_variables: bool = dataclasses.field(default=True)
include_constraint_precedence_on_groups: bool = dataclasses.field(default=True)
include_constraints_on_groups: bool = dataclasses.field(default=True)
include_generalized_time_constraints: bool = dataclasses.field(default=True)
include_variable_resource: bool = dataclasses.field(default=True)
[docs]
class DurationEncodingEnum(Enum):
INDICATOR = 0
ELEMENT = 1
[docs]
class CpSatFlexSolver(
PrecedenceSchedulingCpSatSolver[Task],
CumulativeResourceSchedulingCpSatSolver[Task, CumulativeResource, NoUnaryResource],
NonRenewableCpSatSolver[Task, NonRenewableResource],
WarmstartMixin,
):
hyperparameters = [
EnumHyperparameter(
name="duration_encoding",
enum=DurationEncodingEnum,
default=DurationEncodingEnum.INDICATOR,
)
]
problem: FlexProblem
def __init__(self, problem: FlexProblem, **kwargs: Any):
super().__init__(problem, **kwargs)
self.variables = {}
(
self.min_start_time,
self.max_start_time,
self.min_end_time,
self.max_end_time,
) = get_lb_ub_start_end_date(problem=self.problem)
self.durs: Dict[
Tuple[int, int], Tuple[List[int], Dict[int, List[Tuple[int, int]]]]
] = self.problem.durations_data
self.duration_encoding: DurationEncodingEnum = None
[docs]
def set_warm_start(self, solution: Solution) -> None:
if self.solver is not None:
self.cp_model.ClearHints()
response = self.solver.ResponseProto() # Get the raw response
for i in range(len(response.solution)):
var = self.cp_model.GetIntVarFromProtoIndex(i)
# print(f"Variable {var} = {response.solution[i]}")
self.cp_model.AddHint(var, response.solution[i])
else:
self.set_warm_start_from_sol(solution)
[docs]
def set_warm_start_from_sol(self, solution: Solution) -> None:
l = self.from_solution_to_hint(solution)
self.cp_model.ClearHints()
done = set()
for var, val in l:
if str(var) not in done and isinstance(
var, ortools.sat.python.cp_model.IntVar
):
self.cp_model.AddHint(var, int(val))
done.add(str(var))
else:
pass
# print(var, " already")
[docs]
def get_task_start_or_end_variable(
self, task: Task, start_or_end: StartOrEnd
) -> LinearExprT:
index = self.problem.task_id_to_index[task]
if start_or_end == StartOrEnd.START:
return self.variables["starts"][index]
elif start_or_end == StartOrEnd.END:
return self.variables["ends"][index]
return None
[docs]
def get_task_mode_is_present_variable(self, task: Task, mode: int) -> LinearExprT:
index = self.problem.task_id_to_index[task]
return self.variables["is_present"][index][mode]
[docs]
def from_solution_to_hint(
self, solution: ScheduleSolution
) -> list[tuple[ortools.sat.python.cp_model.VariableT, int]]:
list_variables_value = []
# Main variables
for i in range(self.problem.nb_tasks):
list_variables_value.append(
(self.variables["starts"][i], solution.schedule[i, 0])
)
list_variables_value.append(
(self.variables["ends"][i], solution.schedule[i, 1])
)
list_variables_value.append(
(
self.variables["durations"][i],
solution.schedule[i, 1] - solution.schedule[i, 0],
)
)
# Modes variables
for i in range(self.problem.nb_tasks):
mode_chosen = solution.modes[i]
for mode in self.problem.tasks[i].modes:
if mode == mode_chosen:
if len(self.problem.tasks[i].modes) > 1:
list_variables_value.append(
(self.variables["is_present"][i][mode], 1)
)
list_variables_value.append(
(
self.variables["opt_durations"][i][mode],
solution.schedule[i, 1] - solution.schedule[i, 0],
)
)
else:
if len(self.problem.tasks[i].modes) > 1:
list_variables_value.append(
(self.variables["is_present"][i][mode], 0)
)
list_variables_value.append(
(self.variables["opt_durations"][i][mode], 0)
)
# Group variables
for group in self.problem.tasks_group:
group_id = group.id
ft = group.first_task_if_any
lt = group.last_task_if_any
if ft is not None:
i = self.problem.task_id_to_index[ft]
st_grp = solution.schedule[i, 0]
else:
st_grp = min(
[
solution.schedule[self.problem.task_id_to_index[id_task], 0]
for id_task in group.tasks_group
]
)
if lt is not None:
i = self.problem.task_id_to_index[lt]
end_grp = solution.schedule[i, 1]
else:
end_grp = max(
[
solution.schedule[self.problem.task_id_to_index[id_task], 1]
for id_task in group.tasks_group
]
)
list_variables_value.append(
(self.variables["start_span_variables"][group_id], st_grp)
)
list_variables_value.append(
(self.variables["end_span_variables"][group_id], end_grp)
)
list_variables_value.append(
(self.variables["duration_span_variables"][group_id], end_grp - st_grp)
)
list_variables_value += self.from_solution_to_hint_non_released_delta(
solution=solution
)
# Resource
for resource in self.problem.resources:
if "resource_capacity_variables" in self.variables:
if resource.id in self.variables["resource_capacity_variables"]:
list_variables_value.append(
(
self.variables["resource_capacity_variables"][resource.id],
int(resource.max_capacity),
)
)
# Indicator for duration..
if "dictionary_indicators" in self.variables:
for (index, mode), dur in self.variables["dictionary_indicators"]:
if (
solution.schedule[index, 1] - solution.schedule[index, 0] == dur
and solution.modes[index] == mode
):
list_variables_value.append(
(
self.variables["dictionary_indicators"][
((index, mode), dur)
],
1,
)
)
else:
list_variables_value.append(
(
self.variables["dictionary_indicators"][
((index, mode), dur)
],
0,
)
)
groups = [
g
for g in self.problem.tasks_group
if g.type_of_group == GroupType.SUBGROUP_TASK_FOR_OBJECTIVE
]
nb_groups = len(groups)
if "capacity_group_execution" in self.variables:
list_variables_value.append(
(self.variables["capacity_group_execution"], nb_groups)
) # could be reduced
if "earliness_task" in self.variables:
for t in self.variables["earliness_task"]:
it = self.problem.task_id_to_index[t]
end = solution.schedule[it, 1]
list_variables_value.append(
(
self.variables["earliness_task"][t],
max([0, self.problem.tasks[it].max_ending_date - end]),
)
)
list_variables_value.append(
(self.variables["makespan"], max(solution.schedule[:, 1]))
)
list_variables_value += self.from_solution_to_hint_earliness(solution)
return list_variables_value
[docs]
def from_solution_to_hint_earliness(self, solution: ScheduleSolution):
list_variables_value = []
for id_task in self.variables["tardiness"]["tasks"]:
index = self.problem.task_id_to_index[id_task]
deadline = self.problem.task_id_dict[id_task].max_ending_date
end = solution.schedule[index, 1]
if end > deadline:
list_variables_value.append(
(
self.variables["tardiness"]["tasks"][id_task]["tardiness"],
end - deadline,
)
)
if "earliness" in self.variables["tardiness"]["tasks"][id_task]:
list_variables_value.append(
(self.variables["tardiness"]["tasks"][id_task]["earliness"], 0)
)
else:
list_variables_value.append(
(self.variables["tardiness"]["tasks"][id_task]["tardiness"], 0)
)
if "earliness" in self.variables["tardiness"]["tasks"][id_task]:
list_variables_value.append(
(
self.variables["tardiness"]["tasks"][id_task]["earliness"],
deadline - end,
)
)
for id_group in self.variables["tardiness"]["groups"]:
group = self.problem.tasks_group[self.problem.group_id_to_index[id_group]]
deadline = group.max_ending_date
end = max(
[
solution.schedule[self.problem.task_id_to_index[id_task], 1]
for id_task in group.tasks_group
]
)
if end > deadline:
list_variables_value.append(
(
self.variables["tardiness"]["groups"][id_group]["tardiness"],
end - deadline,
)
)
if "earliness" in self.variables["tardiness"]["groups"][id_group]:
list_variables_value.append(
(
self.variables["tardiness"]["groups"][id_group][
"earliness"
],
0,
)
)
else:
list_variables_value.append(
(self.variables["tardiness"]["groups"][id_group]["tardiness"], 0)
)
if "earliness" in self.variables["tardiness"]["groups"][id_group]:
list_variables_value.append(
(
self.variables["tardiness"]["groups"][id_group][
"earliness"
],
deadline - end,
)
)
return list_variables_value
[docs]
def from_solution_to_hint_non_released_delta(self, solution: ScheduleSolution):
list_variables_value = []
if (
self.problem.constraints.successor_with_res_release_at_start_of_successor
is not None
):
data = self.problem.constraints.successor_with_res_release_at_start_of_successor
for t1, t2, d_res in data:
# This is the simple case where, the first element of the tuple is the task id, without any mode.
if t1 in self.problem.task_id_to_index:
i1 = self.problem.task_id_to_index[t1]
i2 = self.problem.task_id_to_index[t2]
list_variables_value.append(
(
self.variables["durations_non_release_1"][(i1, i2)],
solution.schedule[i2, 0] - solution.schedule[i1, 1],
)
)
if (
self.problem.constraints.successor_with_res_release_at_start_of_successor_mode
is not None
):
durations_non_release = {}
data = self.problem.constraints.successor_with_res_release_at_start_of_successor_mode
for (t1, mode), t2, d_res in data:
i1 = self.problem.task_id_to_index[t1]
i2 = self.problem.task_id_to_index[t2]
assert mode in self.variables["is_present"][i1]
if (i1, mode, i2) not in durations_non_release:
if solution.modes[i1] == mode:
list_variables_value.append(
(
self.variables["durations_non_release_2"][
(i1, mode, i2)
],
solution.schedule[i2, 0] - solution.schedule[i1, 1],
)
)
else:
list_variables_value.append(
(
self.variables["durations_non_release_2"][
(i1, mode, i2)
],
0,
)
)
if (
self.problem.constraints.successor_generic_with_res_release_at_start_of_successor_generic
is not None
):
data = self.problem.constraints.successor_generic_with_res_release_at_start_of_successor_generic
for t1, t2, d_res in data:
tag = []
st_ = None
end_ = None
if t1.is_a_task:
i1 = self.problem.task_id_to_index[t1.task_id]
st_ = solution.schedule[i1, 1]
tag.append(("task", t1.task_id))
else:
group = t1.group_id
gr: TasksGroups = [
g for g in self.problem.tasks_group if g.id == group
][0]
st_ = max(
[
solution.schedule[self.problem.task_id_to_index[id_task], 1]
for id_task in gr.tasks_group
]
)
tag.append(("group", group))
if t2.is_a_task:
i2 = self.problem.task_id_to_index[t2.task_id]
end_ = solution.schedule[i2, 0]
tag.append(("task", t2.task_id))
else:
group = t2.group_id
gr: TasksGroups = [
g for g in self.problem.tasks_group if g.id == group
][0]
end_ = min(
[
solution.schedule[self.problem.task_id_to_index[id_task], 0]
for id_task in gr.tasks_group
]
)
tag.append(("group", group))
tag = tuple(tag)
if self.variables["durations_non_release_3"][tag] not in [
x[0] for x in list_variables_value
]:
list_variables_value.append(
(self.variables["durations_non_release_3"][tag], end_ - st_)
)
return list_variables_value
[docs]
def init_main_variables(self):
starts_variable = {}
ends_variable = {}
durations_variable = {}
intervals_variable = {}
for i in range(self.problem.nb_tasks):
domain_intervals = []
max_duration = 0
for key in self.durs:
if key[0] == i:
for val in self.durs[key][1]:
if val >= 0:
domain_intervals.extend(self.durs[key][1][val])
max_duration = max(max_duration, int(val))
domain = Domain.FromIntervals(domain_intervals)
starts_variable[i] = self.cp_model.NewIntVarFromDomain(
domain=domain, name=f"start_{i}"
)
# lb=self.min_start_time[i], ub=self.max_start_time[i], name=f"start_{i}"
# )
self.cp_model.add(starts_variable[i] >= int(self.min_start_time[i]))
self.cp_model.add(starts_variable[i] <= int(self.max_start_time[i]))
ends_variable[i] = self.cp_model.NewIntVar(
lb=int(self.min_end_time[i]),
ub=int(self.max_end_time[i])
if not self.problem.tasks[i].soft_max_end_date
else int(self.problem.horizon),
# deadline is actually soft
name=f"end_{i}",
)
min_duration = min(
[
self.problem.tasks[i].modes[m].duration
for m in self.problem.tasks[i].modes
]
)
durations_variable[i] = self.cp_model.NewIntVar(
lb=min_duration, ub=max_duration, name=f"duration_{i}"
)
intervals_variable[i] = self.cp_model.NewIntervalVar(
start=starts_variable[i],
size=durations_variable[i],
end=ends_variable[i],
name=f"interval_{i}",
)
allowed = []
for val in self.durs[(i, 1)][1]:
if val >= 0:
for lb, ub in self.durs[(i, 1)][1][val]:
for j in range(lb, ub + 1):
allowed.append((j, int(val)))
# self.cp_model.add_allowed_assignments([starts_variable[i], durations_variable[i]],
# allowed)
self.variables["starts"] = starts_variable
self.variables["ends"] = ends_variable
self.variables["durations"] = durations_variable
self.variables["intervals"] = intervals_variable
[docs]
def init_optional_interval_variables(self):
# This is for multimode tasks !
is_present_variable = {}
# opt_starts_variable = {} # might be useful one day.
# opt_end_variable = {}
opt_durations_variable = {}
opt_intervals_variable = {}
for i in range(self.problem.nb_tasks):
nb_modes = len(self.problem.tasks[i].modes)
is_present_variable[i] = {}
opt_durations_variable[i] = {}
opt_intervals_variable[i] = {}
for mode in self.problem.tasks[i].modes:
if nb_modes > 1:
is_present_variable[i][mode] = self.cp_model.NewBoolVar(
name=f"task_{i}_m_{mode}"
)
else:
is_present_variable[i][mode] = (
True # Maybe reconsider this when the task is totally.
)
dur = self.problem.tasks[i].modes[mode].duration
lb = dur
ub = (
dur
if not self.problem.tasks[i]
.modes[mode]
.preemptive_on_resource_break
else 100 * dur
)
opt_durations_variable[i][mode] = self.cp_model.NewIntVar(
lb=lb, ub=ub, name=f"dur_task_{i}_m_{mode}"
)
if nb_modes == 1:
self.cp_model.Add(
opt_durations_variable[i][mode]
== self.variables["durations"][i]
)
opt_intervals_variable[i][mode] = self.cp_model.NewOptionalIntervalVar(
start=self.variables["starts"][i],
size=opt_durations_variable[i][mode],
end=self.variables["ends"][i],
is_present=is_present_variable[i][mode],
name=f"interval_task_{i}_m_{mode}",
)
self.cp_model.AddExactlyOne(
[is_present_variable[i][mode] for mode in is_present_variable[i]]
) # I choose 1 and only 1 mode.
self.variables["opt_intervals"] = opt_intervals_variable
self.variables["opt_durations"] = opt_durations_variable
self.variables["is_present"] = is_present_variable
[docs]
def init_group_variables(self):
if len(self.problem.tasks_group) == 0:
return
(
gmin_start_time,
gmax_start_time,
gmin_end_time,
gmax_end_time,
) = get_lb_ub_start_end_date_group_of_task(self.problem)
span_interval_variables = {}
start_span_variables = {}
duration_span_variables = {}
end_span_variables = {}
for group in self.problem.tasks_group:
group_id = group.id
ft = group.first_task_if_any
lt = group.last_task_if_any
index_group = {self.problem.task_id_to_index[t] for t in group.tasks_group}
if ft is not None:
start_span_variables[group_id] = self.variables["starts"][
self.problem.task_id_to_index[ft]
]
else:
start_span_variables[group_id] = self.cp_model.NewIntVar(
lb=int(gmin_start_time[group_id]),
ub=int(gmax_start_time[group_id]),
name=f"start_span_gr_{group_id}",
)
self.cp_model.AddMinEquality(
start_span_variables[group_id],
[self.variables["starts"][i_x] for i_x in index_group],
)
if lt is not None:
end_span_variables[group_id] = self.variables["ends"][
self.problem.task_id_to_index[lt]
]
else:
end_span_variables[group_id] = self.cp_model.NewIntVar(
lb=int(gmin_end_time[group_id]),
ub=int(gmax_end_time[group_id])
if not group.soft_max_end_date
else int(self.problem.horizon),
# deadline is actually soft,
name=f"end_span_gr_{group_id}",
)
self.cp_model.AddMaxEquality(
end_span_variables[group_id],
[self.variables["ends"][i_x] for i_x in index_group],
)
duration_span_variables[group_id] = self.cp_model.NewIntVar(
lb=0, ub=int(self.problem.horizon), name=f"duration_span_gr_{group_id}"
)
# self.cp_model.Add(
# end_span_variables[group_id]
# == start_span_variables[group_id] + duration_span_variables[group_id]
# )
span_interval_variables[group_id] = self.cp_model.NewIntervalVar(
start=start_span_variables[group_id],
size=duration_span_variables[group_id],
end=end_span_variables[group_id],
name=f"interval_span_gr_{group_id}",
)
self.variables["start_span_variables"] = start_span_variables
self.variables["duration_span_variables"] = duration_span_variables
self.variables["end_span_variables"] = end_span_variables
self.variables["span_interval_variables"] = span_interval_variables
[docs]
def constraint_reservoir(self, constraint_including: ConstraintIncluding = None):
for res in self.problem.resources:
if res.max_capacity == 1:
task_mode_consume = [
(
index_task,
mode,
self.problem.tasks[index_task]
.modes[mode]
.get_res_consumption(res.id),
)
for index_task in range(self.problem.nb_tasks)
for mode in self.problem.tasks[index_task].modes
if self.problem.tasks[index_task]
.modes[mode]
.get_res_consumption(res.id)
> 0
]
active = [
self.variables["is_present"][x[0]][x[1]] for x in task_mode_consume
]
consos = [x[2] for x in task_mode_consume]
times = [0]
vals = [1]
actives = [1]
for i in range(len(consos)):
times.append(self.variables["starts"][task_mode_consume[i][0]])
vals.append(-consos[i])
actives.append(active[i])
times.append(self.variables["ends"][task_mode_consume[i][0]])
vals.append(consos[i])
actives.append(active[i])
if (
self.problem.constraints.successor_generic_with_res_release_at_start_of_successor_generic
is not None
):
data = self.problem.constraints.successor_generic_with_res_release_at_start_of_successor_generic
for t1, t2, d_res in data:
if not t1.is_a_task or not t2.is_a_task:
continue
i1 = self.problem.task_id_to_index[t1.task_id]
i2 = self.problem.task_id_to_index[t2.task_id]
if res.id in d_res:
times.append(self.variables["ends"][i1])
vals.append(-d_res[res.id])
actives.append(1)
times.append(self.variables["starts"][i2])
vals.append(d_res[res.id])
actives.append(1)
self.cp_model.AddReservoirConstraint(times, vals, 0, 1)
[docs]
def init_intervals_of_non_released_resource(
self, constraint_including: ConstraintIncluding = None
):
if constraint_including is None:
constraint_including = ConstraintIncluding()
intervals_non_release = {}
if (
self.problem.constraints.successor_with_res_release_at_start_of_successor
is not None
):
durations_non_release = {}
data = self.problem.constraints.successor_with_res_release_at_start_of_successor
for t1, t2, d_res in data:
# This is the simple case where, the first element of the tuple is the task id, without any mode.
if t1 in self.problem.task_id_to_index:
i1 = self.problem.task_id_to_index[t1]
i2 = self.problem.task_id_to_index[t2]
if (i1, i2) not in durations_non_release:
delta = self.cp_model.NewIntVar(
lb=0,
ub=self.max_start_time[i2] - self.min_end_time[i1],
name=f"delta_end_{i1}_start_{i2}",
)
durations_non_release[(i1, i2)] = delta
itv = self.cp_model.NewIntervalVar(
start=self.variables["ends"][i1],
size=durations_non_release[(i1, i2)],
end=self.variables["starts"][i2],
name=f"interval_end_{i1}_start_{i2}",
)
for res in d_res:
if res not in intervals_non_release:
intervals_non_release[res] = []
intervals_non_release[res].append((itv, d_res[res]))
self.variables["durations_non_release_1"] = durations_non_release
if (
self.problem.constraints.successor_with_res_release_at_start_of_successor_mode
is not None
):
durations_non_release = {}
data = self.problem.constraints.successor_with_res_release_at_start_of_successor_mode
for (t1, mode), t2, d_res in data:
i1 = self.problem.task_id_to_index[t1]
i2 = self.problem.task_id_to_index[t2]
assert mode in self.variables["is_present"][i1]
if (i1, mode, i2) not in durations_non_release:
delta = self.cp_model.NewIntVar(
lb=0,
ub=self.max_start_time[i2] - self.min_end_time[i1],
name=f"delta_end_{i1}_start_{i2}",
)
durations_non_release[(i1, mode, i2)] = delta
# This
itv = self.cp_model.NewOptionalIntervalVar(
start=self.variables["ends"][i1],
size=durations_non_release[(i1, mode, i2)],
end=self.variables["starts"][i2],
is_present=self.variables["is_present"][i1][mode],
name=f"interval_end_{i1}_start_{i2}",
)
for res in d_res:
if res not in intervals_non_release:
intervals_non_release[res] = []
intervals_non_release[res].append((itv, d_res[res]))
self.variables["durations_non_release_2"] = durations_non_release
if (
self.problem.constraints.successor_generic_with_res_release_at_start_of_successor_generic
is not None
):
durations_non_release = {}
data = self.problem.constraints.successor_generic_with_res_release_at_start_of_successor_generic
for t1, t2, d_res in data:
tag = []
if (
not t1.is_a_task or not t2.is_a_task
) and not constraint_including.include_group_variables:
continue
if t1.is_a_task:
i1 = self.problem.task_id_to_index[t1.task_id]
min_end_time = self.min_end_time[i1]
end_t1 = self.variables["ends"][i1]
tag.append(("task", t1.task_id))
else:
group = t1.group_id
gr: TasksGroups = [
g for g in self.problem.tasks_group if g.id == group
][0]
min_end_time = max(
[
self.min_end_time[self.problem.task_id_to_index[i]]
for i in gr.tasks_group
]
)
end_t1 = self.variables["end_span_variables"][group]
tag.append(("group", group))
if t2.is_a_task:
i2 = self.problem.task_id_to_index[t2.task_id]
max_start_time = self.max_start_time[i2]
start_t2 = self.variables["starts"][i2]
tag.append(("task", t2.task_id))
else:
group = t2.group_id
gr: TasksGroups = [
g for g in self.problem.tasks_group if g.id == group
][0]
max_start_time = min(
[
self.max_start_time[self.problem.task_id_to_index[i]]
for i in gr.tasks_group
]
)
start_t2 = self.variables["start_span_variables"][group]
tag.append(("group", group))
tag = tuple(tag)
if tag not in durations_non_release:
delta = self.cp_model.NewIntVar(
lb=0,
ub=int(max_start_time - min_end_time),
name=f"delta_end_{tag[0]}_start_{tag[1]}",
)
durations_non_release[tag] = delta
self.cp_model.add(start_t2 >= end_t1)
itv = self.cp_model.NewIntervalVar(
start=end_t1,
size=durations_non_release[tag],
end=start_t2,
name=f"interval_end_{tag[0]}_start_{tag[1]}",
)
for res in d_res:
if res not in intervals_non_release:
intervals_non_release[res] = []
intervals_non_release[res].append((itv, d_res[res]))
self.variables["durations_non_release_3"] = durations_non_release
self.variables["intervals_non_release"]: Dict[
RESOURCE_KEY, list[tuple[IntervalVar, int]]
] = intervals_non_release
[docs]
def init_resource_variables(self):
resource_capacity_var = {}
obj_param = self.problem.objective_params.params_obj.get(
ObjectivesEnum.RESOURCE_COST, None
)
if obj_param is not None:
for resource in self.problem.resources:
if (
obj_param.weight_per_resource_unit.get(resource.id, 0) > 0
or obj_param.consider_in_objectives[resource.id]
):
resource_capacity_var[resource.id] = self.cp_model.NewIntVar(
lb=0,
ub=int(resource.max_capacity),
name=f"res_capacity_{resource.id}",
)
self.variables["resource_capacity_variables"] = resource_capacity_var
[docs]
def init_and_constraint_wip_variables(self):
# This is a variable to store some "capacity" variable on the number of work in progress.
if self.problem.tasks_group is None or len(self.problem.tasks_group) == 0:
return
groups = [
g
for g in self.problem.tasks_group
if g.type_of_group == GroupType.SUBGROUP_TASK_FOR_OBJECTIVE
]
nb_groups = len(groups)
if nb_groups == 0:
return
capacity_group_execution = self.cp_model.NewIntVar(
lb=1, ub=nb_groups, name="capacity_nb_group_in_progress"
)
self.cp_model.AddCumulative(
[
self.variables["span_interval_variables"][
g.id
] # this spans over the entire group of task.
for g in groups
],
[1] * nb_groups,
capacity_group_execution,
)
self.variables["capacity_group_execution"] = capacity_group_execution
[docs]
def constraint_precedence(self):
"""
Basic precedence constraint
"""
for t_id in self.problem.constraints.successors:
index = self.problem.task_id_to_index[t_id]
for succ in self.problem.constraints.successors[t_id]:
succ_index = self.problem.task_id_to_index[succ]
self.cp_model.Add(
self.variables["starts"][succ_index]
>= self.variables["ends"][index]
)
[docs]
def constraint_precedence_on_groups(self):
"""
Basic precedence constraint between group of tasks
"""
if self.problem.constraints.successors_group_tasks is None:
return
for g_id in self.problem.constraints.successors_group_tasks:
for g_succ_id in self.problem.constraints.successors_group_tasks[g_id]:
self.cp_model.Add(
self.variables["start_span_variables"][g_succ_id]
>= self.variables["end_span_variables"][g_id]
)
[docs]
def constraint_duration_of_tasks(self):
"""
Tricky constraint : should take into account the partial preemption possibility,
which makes duration variable based on calendars
"""
durs = self.durs
dictionary_indicators = {}
if self.duration_encoding == DurationEncodingEnum.INDICATOR:
for task_index, mode in durs:
d = self.constraint_duration_of_task(
task_index=task_index,
mode=mode,
duration_per_interval=durs[(task_index, mode)][1],
)
dictionary_indicators.update(d)
self.variables["dictionary_indicators"] = dictionary_indicators
for index in self.variables["is_present"]:
all_key = [
x
for x in self.variables["dictionary_indicators"]
if x[0][0] == index
]
self.cp_model.AddExactlyOne(
[self.variables["dictionary_indicators"][x] for x in all_key]
)
elif self.duration_encoding == DurationEncodingEnum.ELEMENT:
for task_index, mode in durs:
self.constraint_duration_of_task_element(task_index, mode)
[docs]
def constraint_duration_of_task_element(
self,
task_index: int,
mode: int,
):
positive_durations = [d for d in self.durs[task_index, mode][1] if d >= 0]
duration_per_interval = self.durs[task_index, mode][1]
duration_list = self.durs[task_index, mode][0]
if len(positive_durations) == 1:
dur = int(positive_durations[0])
interval = Domain.FromIntervals(duration_per_interval[dur])
self.cp_model.AddLinearExpressionInDomain(
self.variables["starts"][task_index], interval
)
self.cp_model.Add(self.variables["opt_durations"][task_index][mode] == dur)
else:
duration_list = [int(x) for x in duration_list]
self.cp_model.add_element(
self.variables["starts"][task_index],
expressions=duration_list,
target=self.variables["opt_durations"][task_index][mode],
)
return None
[docs]
def constraint_duration_of_task(
self,
task_index: int,
mode: int,
duration_per_interval: Dict[int, List[Tuple[int, int]]],
):
dictionary_indicators = {}
positive_durations = [d for d in duration_per_interval if d >= 0]
if len(positive_durations) == 1:
dur = int(positive_durations[0])
interval = Domain.FromIntervals(duration_per_interval[dur])
self.cp_model.AddLinearExpressionInDomain(
self.variables["starts"][task_index], interval
)
self.cp_model.Add(self.variables["opt_durations"][task_index][mode] == dur)
dictionary_indicators[((task_index, mode), dur)] = self.variables[
"is_present"
][task_index][mode]
else:
for possible_duration in duration_per_interval:
if possible_duration < 0:
continue
interval = Domain.FromIntervals(
duration_per_interval[possible_duration]
)
dictionary_indicators[((task_index, mode), possible_duration)] = (
self.cp_model.NewBoolVar(
f"d_{(task_index, mode), possible_duration}"
)
)
self.cp_model.AddLinearExpressionInDomain(
self.variables["starts"][task_index], interval
).OnlyEnforceIf(
dictionary_indicators[((task_index, mode), possible_duration)]
)
self.cp_model.Add(
self.variables["opt_durations"][task_index][mode]
== int(possible_duration)
).OnlyEnforceIf(
dictionary_indicators[((task_index, mode), possible_duration)]
)
# corrected version (to be confirmed)
self.cp_model.Add(
sum([dictionary_indicators[k] for k in dictionary_indicators])
== self.variables["is_present"][task_index][mode]
)
# Seems incorrect with non duplicated starts variables, even though it works most of the time
# self.cp_model.Add(
# sum([dictionary_indicators[k] for k in dictionary_indicators]) == 1
# ) # Only one indicator activated.
# self.cp_model.AddExactlyOne([dictionary_indicators[k] for k in dictionary_indicators])
return dictionary_indicators
[docs]
def constraint_cumulative(self, constraint_including: ConstraintIncluding):
for r in self.problem.resources:
if r.renewable:
if constraint_including.include_variable_resource:
if r.id in self.variables["resource_capacity_variables"]:
self.constraint_cumulative_resource(
resource=r,
variable_max_capacity=True,
constraint_including=constraint_including,
)
self.constraint_cumulative_resource(
resource=r,
variable_max_capacity=False,
constraint_including=constraint_including,
)
else:
self.constraint_non_renewable_resource(
resource=r, variable_max_capacity=True
)
self.constraint_non_renewable_resource(
resource=r, variable_max_capacity=False
)
[docs]
def constraint_non_renewable_resource(
self, resource: ResourceData, variable_max_capacity: bool = False
):
capa = resource.max_capacity
id_resource = resource.id
task_mode_consume = [
(
self.variables["is_present"][i][mode],
int(self.problem.tasks[i].modes[mode].get_res_consumption(id_resource)),
)
for i in self.variables["opt_intervals"]
for mode in self.variables["opt_intervals"][i]
if self.problem.tasks[i].modes[mode].get_res_consumption(id_resource) > 0
]
if not variable_max_capacity:
self.cp_model.Add(sum([x[0] * x[1] for x in task_mode_consume]) <= capa)
else:
self.cp_model.Add(
sum([x[0] * x[1] for x in task_mode_consume])
== self.variables["resource_capacity_variables"][resource.id]
)
[docs]
def constraint_cumulative_resource(
self,
resource: ResourceData,
variable_max_capacity: bool = False,
constraint_including: ConstraintIncluding = None,
):
post_cumulative_constraints(
problem=self.problem,
resource=resource,
solver=self,
variable_max_capacity=variable_max_capacity,
include_intervals_non_release=constraint_including.include_non_released_resource,
)
[docs]
def constraint_cumulative_resource_depr(
self, resource: ResourceData, variable_max_capacity: bool = False
):
"""
Simple and tested version
"""
res_comp: List[Dict[str, int]] = create_resource_consumption_from_calendar(
calendar_availability=resource.calendar_availability
)
id_resource = resource.id
task_mode_consume = [
(
self.variables["opt_intervals"][i][mode],
int(self.problem.tasks[i].modes[mode].get_res_consumption(id_resource)),
)
for i in self.variables["opt_intervals"]
for mode in self.variables["opt_intervals"][i]
if self.problem.tasks[i].modes[mode].get_res_consumption(id_resource) > 0
]
if "intervals_non_release" in self.variables:
if resource.id in self.variables["intervals_non_release"]:
task_mode_consume += self.variables["intervals_non_release"][
resource.id
]
fake_task_res = [
(
self.cp_model.NewFixedSizeIntervalVar(
start=f["start"], size=f["duration"], name=f"res_"
),
f["value"],
)
for f in res_comp
if f["value"] > 0 and f["value"] != resource.max_capacity
]
if not variable_max_capacity:
self.cp_model.AddCumulative(
[x[0] for x in task_mode_consume] + [x[0] for x in fake_task_res],
[x[1] for x in task_mode_consume] + [x[1] for x in fake_task_res],
resource.max_capacity,
)
else:
self.cp_model.AddCumulative(
[x[0] for x in task_mode_consume] + [x[0] for x in fake_task_res],
[x[1] for x in task_mode_consume] + [x[1] for x in fake_task_res],
self.variables["resource_capacity_variables"][resource.id],
)
[docs]
def constraint_on_groups_of_task(self):
# self.constraint_group_non_release_resource_reservoir()
self.constraint_group_non_release_resource()
self.constraint_non_overlap_group()
[docs]
def constraint_sequencing_on_disjunctive_resource(self):
# Sequence var like idea.
for res in self.problem.resources:
if res.max_capacity == 1:
task_mode_consume = [
(
index_task,
mode,
self.problem.tasks[index_task]
.modes[mode]
.get_res_consumption(res.id),
)
for index_task in range(self.problem.nb_tasks)
for mode in self.problem.tasks[index_task].modes
if self.problem.tasks[index_task]
.modes[mode]
.get_res_consumption(res.id)
> 0
]
tasks = []
for i in range(len(task_mode_consume)):
tasks.append(task_mode_consume[i][0])
list_next = []
if (
self.problem.constraints.successor_generic_with_res_release_at_start_of_successor_generic
is not None
):
data = self.problem.constraints.successor_generic_with_res_release_at_start_of_successor_generic
for t1, t2, d_res in data:
if not t1.is_a_task or not t2.is_a_task:
continue
i1 = self.problem.task_id_to_index[t1.task_id]
i2 = self.problem.task_id_to_index[t2.task_id]
list_next += [(i1, i2)]
if i1 not in tasks:
tasks.append(i1)
if i2 not in tasks:
tasks.append(i2)
if len(list_next) > 0:
tasks.append(-1)
vars = {
(i, j): self.cp_model.NewBoolVar(name=f"{j}_after_{i}")
for i in tasks
for j in tasks
if i != j
}
for i in tasks:
if i != -1:
self.cp_model.add(
sum(vars[i, j] for j in tasks if i != j) == 1
)
else:
self.cp_model.add(
sum(vars[j, i] for j in tasks if i != j) == 1
)
for i, j in vars:
if i != -1 and j != -1:
self.cp_model.add(
self.variables["starts"][j] >= self.variables["ends"][i]
).only_enforce_if(vars[i, j])
if (i, j) in list_next:
self.cp_model.add(vars[i, j] == 1)
[docs]
def constraint_group_non_release_resource(self):
groups_non_release_resource = [
g
for g in self.problem.tasks_group
if g.type_of_group == GroupType.GROUP_TASK_NON_RELEASED_RESOURCE
]
all_resource_concerned = set()
for g in groups_non_release_resource:
all_resource_concerned.update(set([r for r in g.res_not_released]))
for resource in all_resource_concerned:
intervals_and_consumption = []
tasks_covered_in_group = set()
for g in groups_non_release_resource:
if "span_interval_variables" in self.variables:
if (
resource in g.res_not_released
and g.res_not_released[resource] > 0
):
intervals_and_consumption.append(
(
self.variables["span_interval_variables"][g.id],
g.res_not_released[resource],
)
)
tasks_covered_in_group.update(g.tasks_group)
task_mode_consume = [
(
self.variables["opt_intervals"][i][mode],
int(
self.problem.tasks[i].modes[mode].get_res_consumption(resource)
),
)
for i in self.variables["opt_intervals"]
for mode in self.variables["opt_intervals"][i]
if self.problem.tasks[i].modes[mode].get_res_consumption(resource) > 0
and self.problem.index_to_task_id[i] not in tasks_covered_in_group
]
task_non_release = self.variables["intervals_non_release"].get(resource, [])
self.cp_model.AddCumulative(
[
x[0]
for x in task_mode_consume
+ intervals_and_consumption
+ task_non_release
],
[
x[1]
for x in task_mode_consume
+ intervals_and_consumption
+ task_non_release
],
self.problem.resource_dict[resource].max_capacity,
)
[docs]
def constraint_group_non_release_resource_reservoir(self):
groups_non_release_resource = [
g
for g in self.problem.tasks_group
if g.type_of_group == GroupType.GROUP_TASK_NON_RELEASED_RESOURCE
]
all_resource_concerned = set()
for g in groups_non_release_resource:
all_resource_concerned.update(set([r for r in g.res_not_released]))
for resource in all_resource_concerned:
intervals_and_consumption = []
tasks_covered_in_group = set()
for g in groups_non_release_resource:
if resource in g.res_not_released and g.res_not_released[resource] > 0:
intervals_and_consumption.append(
(
self.variables["start_span_variables"][g.id],
self.variables["end_span_variables"][g.id],
g.res_not_released[resource],
)
)
tasks_covered_in_group.update(g.tasks_group)
task_mode_consume = [
(
(i, mode),
int(
self.problem.tasks[i].modes[mode].get_res_consumption(resource)
),
)
for i in self.variables["opt_intervals"]
for mode in self.variables["opt_intervals"][i]
if self.problem.tasks[i].modes[mode].get_res_consumption(resource) > 0
and self.problem.index_to_task_id[i] not in tasks_covered_in_group
]
times = [0]
active = [1]
vals = [self.problem.resource_dict[resource].max_capacity]
for i in range(len(task_mode_consume)):
(index, mode), val = task_mode_consume[i]
times.append(self.variables["starts"][index])
vals.append(-val)
active.append(self.variables["is_present"][index][mode])
times.append(self.variables["ends"][index])
vals.append(val)
active.append(self.variables["is_present"][index][mode])
for i in range(len(intervals_and_consumption)):
times.append(intervals_and_consumption[i][0])
vals.append(-intervals_and_consumption[i][2])
active.append(1)
times.append(intervals_and_consumption[i][1])
vals.append(intervals_and_consumption[i][2])
active.append(1)
self.cp_model.AddReservoirConstraint(
times=times,
level_changes=vals,
min_level=0,
max_level=self.problem.resource_dict[resource].max_capacity,
)
# self.cp_model.AddReservoirConstraintWithActive(times=times,
# level_changes=vals,
# actives=active,
# min_level=0,
# max_level=self.problem.resource_dict[resource].max_capacity)
[docs]
def constraint_non_overlap_group(self):
for group in self.problem.tasks_group:
if group.no_overlap:
intervals = [
self.variables["intervals"][self.problem.task_id_to_index[i_t]]
for i_t in group.tasks_group
]
self.cp_model.AddNoOverlap(intervals)
# TODO Be able to remove dummy tasks from the group,
# self.cp_model.AddCumulative(intervals, [1] * len(intervals), 1)
[docs]
def constraint_generalized_time_constraint(self):
if self.problem.constraints.start_at_start is not None:
for t1, t2 in self.problem.constraints.start_at_start:
i1 = self.problem.task_id_to_index[t1]
i2 = self.problem.task_id_to_index[t2]
self.cp_model.Add(
self.variables["starts"][i1] == self.variables["starts"][i2]
)
if self.problem.constraints.start_at_end is not None:
for t1, t2 in self.problem.constraints.start_at_end:
i1 = self.problem.task_id_to_index[t1]
i2 = self.problem.task_id_to_index[t2]
self.cp_model.Add(
self.variables["starts"][i1] == self.variables["ends"][i2]
)
if self.problem.constraints.start_at_end_plus_offset is not None:
for t1, t2, offset in self.problem.constraints.start_at_end_plus_offset:
i1 = self.problem.task_id_to_index[t1]
i2 = self.problem.task_id_to_index[t2]
self.cp_model.Add(
self.variables["starts"][i1] == self.variables["ends"][i2] + offset
)
if self.problem.constraints.start_after_end_plus_offset is not None:
for t1, t2, offset in self.problem.constraints.start_after_end_plus_offset:
i1 = self.problem.task_id_to_index[t1]
i2 = self.problem.task_id_to_index[t2]
self.cp_model.Add(
self.variables["starts"][i1] >= self.variables["ends"][i2] + offset
)
if self.problem.constraints.start_at_start_plus_offset is not None:
for t1, t2, offset in self.problem.constraints.start_at_start_plus_offset:
i1 = self.problem.task_id_to_index[t1]
i2 = self.problem.task_id_to_index[t2]
self.cp_model.Add(
self.variables["starts"][i1]
== self.variables["starts"][i2] + offset
)
if self.problem.constraints.start_after_start_plus_offset is not None:
for (
t1,
t2,
offset,
) in self.problem.constraints.start_after_start_plus_offset:
i1 = self.problem.task_id_to_index[t1]
i2 = self.problem.task_id_to_index[t2]
self.cp_model.Add(
self.variables["starts"][i1]
>= self.variables["starts"][i2] + offset
)
[docs]
def create_objectives(self, constraint_including: ConstraintIncluding = None):
if constraint_including is None:
constraint_including = ConstraintIncluding()
objs = []
weights = []
names = []
for obj_enum in self.problem.objective_params.params_obj:
if obj_enum == ObjectivesEnum.MAKESPAN:
var, name = self.create_makespan()
objs.append(var)
weights.append(self.problem.objective_params.params_obj[obj_enum])
names.append(name)
if (
obj_enum == ObjectivesEnum.RESOURCE_COST
and constraint_including.include_variable_resource
):
var, name = self.create_resource_objective(
obj_params_resource=self.problem.objective_params.params_obj[
obj_enum
]
)
objs.append(var)
weights.append(
self.problem.objective_params.params_obj[obj_enum].weight
)
names.append(name)
if obj_enum == ObjectivesEnum.WORK_IN_PROGRESS:
obj_params_wip = self.problem.objective_params.params_obj[obj_enum]
if obj_params_wip.count_nb_group_in_progress:
self.init_and_constraint_wip_variables()
objs.append(self.variables["capacity_group_execution"])
weights.append(obj_params_wip.coefficient_on_nb_group_in_progress)
names.append("wip_cost")
if obj_enum == ObjectivesEnum.TARDINESS:
var, name = self.create_tardiness_objective(
obj_tardiness=self.problem.objective_params.params_obj[obj_enum]
)
objs.append(var)
weights.append(1)
names.append(name)
if obj_enum == ObjectivesEnum.EARLINESS:
var, name = self.create_earliness_objective(
obj_earliness=self.problem.objective_params.params_obj[obj_enum]
)
objs.append(var)
weights.append(1)
names.append(name)
if obj_enum == ObjectivesEnum.NON_RELEASE_DURATION:
expr = sum(
[
self.variables["durations_non_release_1"][t]
for t in self.variables.get("durations_non_release_1", [])
]
+ [
self.variables["durations_non_release_2"][t]
for t in self.variables.get("durations_non_release_2", [])
]
+ [
self.variables["durations_non_release_3"][t]
for t in self.variables.get("durations_non_release_3", [])
]
)
objs.append(expr)
weights.append(1)
names.append("non_release_duration")
self.variables["obj_data"] = (objs, weights, names)
self.cp_model.Minimize(sum([objs[i] * weights[i] for i in range(len(objs))]))
[docs]
def create_makespan(self):
makespan = self.cp_model.NewIntVar(
lb=0, ub=int(self.problem.horizon), name="makespan"
)
self.cp_model.AddMaxEquality(
makespan, [self.variables["ends"][x] for x in self.variables["ends"]]
)
self.variables["makespan"] = makespan
return makespan, "makespan"
[docs]
def create_resource_objective(self, obj_params_resource: ObjectiveParamResource):
resource_cost = [
self.variables["resource_capacity_variables"][r]
* obj_params_resource.weight_per_resource_unit[r]
for r in obj_params_resource.weight_per_resource_unit
if obj_params_resource.weight_per_resource_unit[r] != 0
and (
(r not in obj_params_resource.consider_in_objectives)
or (obj_params_resource.consider_in_objectives[r])
)
]
return sum(resource_cost), "resource_cost"
[docs]
def create_earliness_objective(self, obj_earliness: ObjectiveParamEarliness):
self.variables["earliness"] = {"tasks": {}, "groups": {}}
cost_list: List[Tuple[LinearExprT, float]] = []
for id_task in obj_earliness.weight_per_task:
if obj_earliness.weight_per_task[id_task] > 0:
index = self.problem.task_id_to_index[id_task]
deadline = self.problem.task_id_dict[id_task].max_ending_date
if deadline is not None:
end = self.variables["ends"][index]
# Create lateness/earliness variables
earliness = self.cp_model.NewIntVar(
lb=0, ub=self.problem.horizon, name=f"earliness_task_{id_task}"
)
self.cp_model.Add(earliness >= deadline - end)
# cost_expr = penalty * lateness + earliness
cost_expr = earliness
cost_list.append(
(cost_expr, obj_earliness.weight_per_task[id_task])
)
self.variables["earliness"]["tasks"][id_task] = {
"earliness": earliness,
}
for id_group in obj_earliness.weight_per_groups:
if obj_earliness.weight_per_groups[id_group] > 0:
index = self.problem.group_id_to_index[id_group]
deadline = self.problem.tasks_group[index].max_ending_date
soft = self.problem.tasks_group[index].soft_max_end_date
if deadline is not None:
end = self.variables["end_span_variables"][id_group]
# Create lateness/earliness variables
earliness = self.cp_model.NewIntVar(
lb=0,
ub=self.problem.horizon,
name=f"earliness_group_{id_group}",
)
self.cp_model.Add(earliness >= deadline - end)
# cost_expr = penalty * lateness + earliness
cost_expr = earliness
cost_list.append(
(cost_expr, obj_earliness.weight_per_groups[id_group])
)
self.variables["earliness"]["groups"][id_group] = {
"earliness": earliness,
}
return (
LinearExpr.weighted_sum(
[x[0] for x in cost_list], [x[1] for x in cost_list]
),
"earliness",
)
[docs]
def create_tardiness_objective(self, obj_tardiness: ObjectiveParamTardiness):
self.variables["tardiness"] = {"tasks": {}, "groups": {}}
cost_list: List[Tuple[LinearExprT, float]] = []
for id_task in obj_tardiness.weight_per_task:
if obj_tardiness.weight_per_task[id_task] > 0:
index = self.problem.task_id_to_index[id_task]
deadline = self.problem.task_id_dict[id_task].max_ending_date
if deadline is not None:
end = self.variables["ends"][index]
# Create lateness/earliness variables
lateness = self.cp_model.NewIntVar(
lb=0,
ub=int(self.problem.horizon),
name=f"lateness_task_{id_task}",
)
self.cp_model.Add(lateness >= end - int(deadline))
cost_list.append((lateness, obj_tardiness.weight_per_task[id_task]))
self.variables["tardiness"]["tasks"][id_task] = {
"tardiness": lateness,
}
for id_group in obj_tardiness.weight_per_groups:
if obj_tardiness.weight_per_groups[id_group] > 0:
index = self.problem.group_id_to_index[id_group]
deadline = self.problem.tasks_group[index].max_ending_date
soft = self.problem.tasks_group[index].soft_max_end_date
if deadline is not None:
end = self.variables["end_span_variables"][id_group]
# Create lateness/earliness variables
lateness = self.cp_model.NewIntVar(
lb=0, ub=self.problem.horizon, name=f"lateness_group_{id_group}"
)
self.cp_model.Add(lateness >= end - int(deadline))
cost_list.append(
(lateness, obj_tardiness.weight_per_groups[id_group])
)
self.variables["tardiness"]["groups"][id_group] = {
"tardiness": lateness,
}
return (
LinearExpr.weighted_sum(
[x[0] for x in cost_list], [x[1] for x in cost_list]
),
"tardiness",
)
[docs]
def init_model(
self, constraint_including: Optional[ConstraintIncluding] = None, **args: Any
) -> None:
if constraint_including is None:
constraint_including = ConstraintIncluding()
args = self.complete_with_default_hyperparameters(args)
super().init_model(**args)
self.duration_encoding = args["duration_encoding"]
self.init_main_variables() # main interval variables
self.init_optional_interval_variables() # for multimode
if constraint_including.include_group_variables:
self.init_group_variables() # create span interval variable for relevant group of tasks.
# for cases where some resource are not released at the end of a task, but rather on the starting of one another
if constraint_including.include_non_released_resource:
self.init_intervals_of_non_released_resource(
constraint_including=constraint_including
)
# self.constraint_reservoir(constraint_including=constraint_including)
# self.constraint_sequencing_on_disjunctive_resource()
if constraint_including.include_variable_resource:
self.init_resource_variables() #
self.constraint_precedence()
if constraint_including.include_constraint_precedence_on_groups:
self.constraint_precedence_on_groups()
self.constraint_duration_of_tasks()
self.constraint_cumulative(constraint_including=constraint_including)
if constraint_including.include_constraints_on_groups:
self.constraint_on_groups_of_task()
if constraint_including.include_generalized_time_constraints:
self.constraint_generalized_time_constraint()
self.create_objectives(constraint_including)
[docs]
def implements_lexico_api(self) -> bool:
return True
[docs]
def get_lexico_objectives_available(self) -> list[str]:
return self.variables["obj_data"][2]
[docs]
def get_lexico_objective_value(self, obj: str, res: ResultStorage) -> float:
if isinstance(obj, tuple):
return sum(
[
obj[2 * i + 1] * res[-1][0]._intern_obj[obj[2 * i]]
for i in range(len(obj) // 2)
]
)
sol = res[-1][0]
return sol._intern_obj[obj]
[docs]
def get_objr_expr(self, obj: Union[str, tuple]):
if isinstance(obj, tuple):
nb_objective = len(obj) // 2
objs = [obj[2 * i] for i in range(nb_objective)]
weights = [obj[2 * i + 1] for i in range(nb_objective)]
objs_expr = [self.get_objr_expr(ob) for ob in objs]
return LinearExpr.weighted_sum(objs_expr, weights)
ind_obj = next(
(
i
for i in range(len(self.variables["obj_data"][2]))
if self.variables["obj_data"][2][i] == obj
),
None,
)
return self.variables["obj_data"][0][ind_obj]
[docs]
def set_lexico_objective(self, obj: str) -> None:
expr = self.get_objr_expr(obj)
if expr is not None:
self.cp_model.Minimize(expr)
else:
logger.warning(f"{obj} objective is absent it seems")
[docs]
def add_lexico_constraint(self, obj: str, value: float) -> Iterable[Any]:
expr = self.get_objr_expr(obj)
if expr is not None:
self.cp_model.Add(expr <= value)
else:
logger.warning(f"{obj} objective is absent it seems")
[docs]
def retrieve_solution(
self, cpsolvercb: CpSolverSolutionCallback
) -> ScheduleSolution:
logger.info(
f"cur obj value : {cpsolvercb.ObjectiveValue()}, bound={cpsolvercb.BestObjectiveBound()}"
)
logger.info("Sub-objectives :")
details_subobj = {}
for i in range(len(self.variables["obj_data"][0])):
try:
logger.info(
f"{self.variables['obj_data'][2][i]} : {cpsolvercb.Value(self.variables['obj_data'][0][i])}"
)
details_subobj[self.variables["obj_data"][2][i]] = cpsolvercb.Value(
self.variables["obj_data"][0][i]
)
except Exception as e:
print(e)
if "resource_capacity_variables" in self.variables:
logger.info("Resource capacity data : ")
logger.info(
f"{[cpsolvercb.Value(self.variables['resource_capacity_variables'][x]) for x in self.variables['resource_capacity_variables']]}"
)
if "tardiness" in self.variables:
logger.info("Tardiness data")
for group in self.variables["tardiness"]["groups"]:
logger.info(f"Group {group}")
logger.info(
f"tardiness : {cpsolvercb.Value(self.variables['tardiness']['groups'][group]['tardiness'])}"
)
logger.info(
f"earliness : {cpsolvercb.Value(self.variables['tardiness']['groups'][group]['earliness'])}"
)
schedule = np.zeros((self.problem.nb_tasks, 2))
allocation = np.zeros(self.problem.nb_tasks)
for i in range(self.problem.nb_tasks):
schedule[i, 0] = cpsolvercb.Value(self.variables["starts"][i])
schedule[i, 1] = cpsolvercb.Value(self.variables["ends"][i])
for j in self.variables["is_present"][i]:
if cpsolvercb.Value(self.variables["is_present"][i][j]):
allocation[i] = j
sol = ScheduleSolution(
problem=self.problem, schedule=schedule, modes=allocation
)
sol._intern_obj = details_subobj
return sol
[docs]
def get_task_mode_interval(self, task: Task, mode: int) -> IntervalVar:
index = self.problem.task_id_to_index[task]
if mode in self.variables["opt_intervals"][index]:
return self.variables["opt_intervals"][index][mode]
else:
return None
[docs]
def get_task_unary_resource_is_present_variable(
self, task: Task, unary_resource: UnaryResource
) -> LinearExprT:
# No Unary Resource modeled
return None
[docs]
def get_task_unary_resource_interval(
self, task: Task, unary_resource: UnaryResource
) -> IntervalVar:
# No Unary Resource modeled
return None
[docs]
def compute_duration_function_time_cluster(
orig_duration: int,
resource_calendar: np.ndarray,
cumulative_resource_calendar: np.ndarray,
):
duration = -np.ones((cumulative_resource_calendar.shape[0]))
dict_of_interval_per_duration = {}
current_interval = [0, 0]
cur_duration = -1
for i in range(cumulative_resource_calendar.shape[0]):
if resource_calendar[i] == 0:
if duration[i] == duration[i - 1]:
current_interval[1] = i
else:
prev_d = duration[i - 1]
if prev_d not in dict_of_interval_per_duration:
dict_of_interval_per_duration[prev_d] = []
dict_of_interval_per_duration[prev_d] += [
[current_interval[0], current_interval[1]]
]
current_interval = [i, i]
continue
x = cumulative_resource_calendar[i]
if x == 0:
continue
index = next(
(
j
for j in range(i, cumulative_resource_calendar.shape[0])
if cumulative_resource_calendar[j] == x + orig_duration - 1
),
None,
)
if index is not None:
duration[i] = index - i + 1
cur_duration = duration[i]
if i >= 1:
if duration[i] == duration[i - 1]:
current_interval[1] = i
else:
prev_d = duration[i - 1]
if prev_d not in dict_of_interval_per_duration:
dict_of_interval_per_duration[prev_d] = []
dict_of_interval_per_duration[prev_d] += [
[current_interval[0], current_interval[1]]
]
current_interval = [i, i]
else:
break
if current_interval[0] != current_interval[1]:
d = cur_duration
if d not in dict_of_interval_per_duration:
dict_of_interval_per_duration[d] = []
dict_of_interval_per_duration[d] += [[current_interval[0], current_interval[1]]]
if len(dict_of_interval_per_duration) == 0:
dict_of_interval_per_duration[orig_duration] = current_interval
# print(dict_of_interval_per_duration)
return duration, dict_of_interval_per_duration
[docs]
def compute_duration_tasks_function_time(problem: FlexProblem, method=None):
if method is None:
method = compute_duration_function_time_cluster
resource_calendar_dict = {
problem.resources[i].id: problem.resources[i].calendar_availability > 0
for i in range(len(problem.resources))
}
cumulative_calendar_dict = {
r: np.cumsum(resource_calendar_dict[r]) for r in resource_calendar_dict
}
durations = {
(i, m): None for i in range(problem.nb_tasks) for m in problem.tasks[i].modes
}
for i in range(problem.nb_tasks):
for m in problem.tasks[i].modes:
task_data: TaskData = problem.tasks[i].modes[m]
resource_non_zeros = [
r
for r in task_data.resource_consumption
if task_data.resource_consumption[r] > 0
]
if len(resource_non_zeros) == 0:
durations[i, m] = ([], {task_data.duration: [[0, problem.horizon]]})
elif len(resource_non_zeros) == 1:
orig_duration = task_data.duration
res_consumption = task_data.resource_consumption[resource_non_zeros[0]]
c = (
problem.resources[
problem.resource_id_to_index[resource_non_zeros[0]]
].calendar_availability
>= res_consumption
)
durations[i, m] = method(
orig_duration=orig_duration,
resource_calendar=c, # resource_calendar_dict[resource_non_zeros[0]],
cumulative_resource_calendar=np.cumsum(c),
# cumulative_calendar_dict[
# resource_non_zeros[0]
# ],
)
else:
orig_duration = task_data.duration
tuple_res = tuple(
[(r, task_data.resource_consumption[r]) for r in resource_non_zeros]
)
if tuple_res not in resource_calendar_dict:
# For the first resource in the tuple, b "availability >= consumption"
first_res_id, first_consumption = tuple_res[0]
b = (
problem.resources[
problem.resource_id_to_index[first_res_id]
].calendar_availability
>= first_consumption
)
for res_id, cons in tuple_res[1:]:
b &= (
problem.resources[
problem.resource_id_to_index[res_id]
].calendar_availability
>= cons
)
resource_calendar_dict[tuple_res] = b
cumulative_calendar_dict[tuple_res] = np.cumsum(
resource_calendar_dict[tuple_res]
)
durations[i, m] = method(
orig_duration=orig_duration,
resource_calendar=resource_calendar_dict[tuple_res],
cumulative_resource_calendar=cumulative_calendar_dict[tuple_res],
)
return durations
[docs]
def post_cumulative_constraints(
problem: FlexProblem,
resource: ResourceData,
solver: CpSatFlexSolver,
variable_max_capacity: bool,
include_intervals_non_release: bool = True,
):
inputs_constraint = build_multiple_cumulative_constraints_inputs(
problem=problem, resource=resource
)
task_non_release = []
if (
"intervals_non_release" in solver.variables
and include_intervals_non_release
and not variable_max_capacity
):
if resource.id in solver.variables["intervals_non_release"]:
task_non_release = solver.variables["intervals_non_release"][resource.id]
for input_data in inputs_constraint:
val = input_data["val"]
set_task_mode_conso = list(input_data["set_task_mode_conso"])
intervals_ = [
solver.variables["opt_intervals"][x[0]][x[1]] for x in set_task_mode_conso
]
consos = [x[2] for x in set_task_mode_conso]
other_intervals_c = [
x for x in task_non_release if x[1] + val <= resource.max_capacity
]
calendar_intervals = [
(
solver.cp_model.NewFixedSizeIntervalVar(
start=f["start"], size=f["duration"], name=f"res_"
),
f["value"],
)
for f in input_data["calendar_tasks"]
]
if len(intervals_) + len(other_intervals_c) == 0:
# Useless
continue
if not variable_max_capacity:
solver.cp_model.AddCumulative(
intervals_
+ [x[0] for x in other_intervals_c]
+ [x[0] for x in calendar_intervals],
consos
+ [x[1] for x in other_intervals_c]
+ [x[1] for x in calendar_intervals],
resource.max_capacity,
)
else:
if "resource_capacity_variables" in solver.variables:
solver.cp_model.AddCumulative(
intervals_
+ [x[0] for x in other_intervals_c]
+ [x[0] for x in calendar_intervals],
consos
+ [x[1] for x in other_intervals_c]
+ [x[1] for x in calendar_intervals],
solver.variables["resource_capacity_variables"][resource.id],
)