Source code for discrete_optimization.workforce.scheduling.solvers.cpsat

#  Copyright (c) 2025 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import logging
import time
from collections.abc import Iterable
from functools import reduce
from typing import Any, Optional, Union

import numpy as np
from ortools.sat.python import cp_model
from ortools.sat.python.cp_model import CpSolver, CpSolverSolutionCallback, IntVar

from discrete_optimization.generic_tools.callbacks.callback import (
    Callback,
    CallbackList,
)
from discrete_optimization.generic_tools.do_problem import Solution
from discrete_optimization.generic_tools.do_solver import WarmstartMixin
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    CategoricalHyperparameter,
    EnumHyperparameter,
    SubBrick,
    SubBrickHyperparameter,
)
from discrete_optimization.generic_tools.ortools_cpsat_tools import (
    OrtoolsCpSatCallback,
    OrtoolsCpSatSolver,
    ParametersCp,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.workforce.commons.fairness_modeling import (
    ModelisationDispersion,
)
from discrete_optimization.workforce.commons.fairness_modeling_ortools import (
    cumulate_value_per_teams_version_2,
    model_fairness,
)
from discrete_optimization.workforce.scheduling.problem import (
    AllocSchedulingProblem,
    AllocSchedulingSolution,
)
from discrete_optimization.workforce.scheduling.solvers import (
    ObjectivesEnum,
    SolverAllocScheduling,
)
from discrete_optimization.workforce.scheduling.solvers.alloc_scheduling_lb import (
    ApproximateBoundAllocScheduling,
    BaseAllocSchedulingLowerBoundProvider,
    BoundResourceViaRelaxedProblem,
    LBoundAllocScheduling,
)
from discrete_optimization.workforce.scheduling.utils import (
    compute_equivalent_teams_scheduling_problem,
)

logger = logging.getLogger(__name__)


[docs] class AdditionalCPConstraints: # some object to store additional constraint that could arrive on the way. def __init__( self, nb_teams_bounds: Optional[tuple[Optional[int], Optional[int]]] = None, team_used_constraint: Optional[dict[int, bool]] = None, set_tasks_ignore_reallocation: Optional[set[int]] = None, forced_allocation: Optional[dict[int, int]] = None, adding_margin_on_sequence: Optional[tuple[bool, int]] = (False, 0), ): """ adding_margin_on_sequence: if the boolean flag is active, we want to add margin between end of a task and start of the next one, with value to the integer value of the tuple. This trick allow to produce schedule that respects the time transition constraint of the more complex problem "AllocSchedRoutingProblem" if the margin is an upper bound of possible transition time. """ self.nb_teams_bounds = nb_teams_bounds self.team_used_constraint = team_used_constraint self.set_tasks_ignore_reallocation = set_tasks_ignore_reallocation self.forced_allocation = forced_allocation if nb_teams_bounds is None: self.nb_teams_bounds = (None, None) if team_used_constraint is None: self.team_used_constraint = {} if set_tasks_ignore_reallocation is None: self.set_tasks_ignore_reallocation = set() if forced_allocation is None: self.forced_allocation = {} self.adding_margin_on_sequence = adding_margin_on_sequence
[docs] class CPSatAllocSchedulingSolver( OrtoolsCpSatSolver, SolverAllocScheduling, WarmstartMixin ): hyperparameters = [ CategoricalHyperparameter( name="symmbreak_on_used", choices=[False, True], default=False ), CategoricalHyperparameter( name="optional_activities", choices=[False, True], default=False ), EnumHyperparameter( name="modelisation_dispersion", enum=ModelisationDispersion, default=ModelisationDispersion.EXACT_MODELING_WITH_IMPLICATION, ), CategoricalHyperparameter( name="adding_redundant_cumulative", default=False, choices=[False, True] ), CategoricalHyperparameter( name="add_lower_bound", default=False, choices=[False, True] ), SubBrickHyperparameter( name="lower_bound_method", choices=[ BoundResourceViaRelaxedProblem, LBoundAllocScheduling, ApproximateBoundAllocScheduling, ], default=SubBrick( BoundResourceViaRelaxedProblem, kwargs=BoundResourceViaRelaxedProblem.get_default_hyperparameters(), ), depends_on=("add_lower_bound", [True]), ), ] problem: AllocSchedulingProblem variables: dict[str, dict[Any, Any]]
[docs] @staticmethod def implements_lexico_api() -> bool: return True
[docs] def get_lexico_objectives_available(self) -> list[str]: return [ obj.name if isinstance(obj, ObjectivesEnum) else obj for obj in self.variables["objectives"].keys() ]
[docs] def set_lexico_objective(self, obj: Union[str, ObjectivesEnum]) -> None: obj = _get_variables_obj_key(obj) self.cp_model.Minimize(self.variables["objectives"][obj])
[docs] def add_lexico_constraint( self, obj: Union[str, ObjectivesEnum], value: float ) -> Iterable[Any]: obj = _get_variables_obj_key(obj) return [self.cp_model.Add(self.variables["objectives"][obj] <= value)]
[docs] def get_lexico_objective_value( self, obj: Union[str, ObjectivesEnum], res: ResultStorage ) -> float: obj = _get_variables_obj_key(obj) sol = res[-1][0] return sol._intern_obj[obj]
[docs] def set_model_obj_aggregated( self, objs_weights: list[tuple[Union[str, ObjectivesEnum], float]] ): self.cp_model.Minimize( sum( [ x[1] * self.variables["objectives"][_get_variables_obj_key(x[0])] for x in objs_weights ] ) )
[docs] def retrieve_solution(self, cpsolvercb: CpSolverSolutionCallback) -> Solution: schedule = np.zeros((self.problem.number_tasks, 2), dtype=int) allocation = -np.ones(self.problem.number_tasks, dtype=int) logger.info(f"Objs = {[cpsolvercb.Value(x) for x in self.variables['objs']]}") logger.info( f"Obj = {cpsolvercb.ObjectiveValue()}, Bound={cpsolvercb.BestObjectiveBound()}" ) if "resched_objs" in self.variables: for obj in self.variables["resched_objs"]: logger.info(f"Obj :{obj}") logger.info( f"Value : {cpsolvercb.Value(self.variables['resched_objs'][obj])}" ) for t in range(self.problem.number_tasks): start = int(cpsolvercb.Value(self.variables["starts_var"][t])) end = int(cpsolvercb.Value(self.variables["ends_var"][t])) schedule[t, 0] = start schedule[t, 1] = end for index_team in self.variables["is_present_var"][t]: if cpsolvercb.Value(self.variables["is_present_var"][t][index_team]): allocation[t] = index_team sol = AllocSchedulingSolution( problem=self.problem, schedule=schedule, allocation=allocation ) sol._intern_obj = {} for obj in self.variables["objectives"]: sol._intern_obj[obj] = cpsolvercb.Value(self.variables["objectives"][obj]) return sol
[docs] def set_warm_start(self, solution: AllocSchedulingSolution) -> None: if solution is not None: self.cp_model.ClearHints() for t in range(self.problem.number_tasks): self.cp_model.AddHint( self.variables["starts_var"][t], int(solution.schedule[t, 0]) ) self.cp_model.AddHint( self.variables["ends_var"][t], int(solution.schedule[t, 1]) ) if "actually_done" in self.variables: self.cp_model.AddHint(self.variables["actually_done"][t], 1) for index_team in self.variables["is_present_var"][t]: if solution.allocation[t] == index_team: self.cp_model.AddHint( self.variables["is_present_var"][t][index_team], 1 ) else: self.cp_model.AddHint( self.variables["is_present_var"][t][index_team], 0 ) team_used = set(solution.allocation) for team in range(self.problem.number_teams): if team in team_used: self.cp_model.AddHint(self.variables["used"][team], 1) else: self.cp_model.AddHint(self.variables["used"][team], 0) if "reallocation" in self.variables: for x in self.variables["reallocation"]: self.cp_model.AddHint(x, 0) if "is_shifted" in self.variables: for x in self.variables["is_shifted"]: self.cp_model.AddHint(x, 0) if "delta_starts_abs" in self.variables: for x in self.variables["delta_starts_abs"]: self.cp_model.AddHint(x, 0) if "max_delta_start" in self.variables: self.cp_model.AddHint(self.variables["max_delta_start"], 0) if "objectives" in self.variables: if ObjectivesEnum.MAKESPAN in self.variables["objectives"]: self.cp_model.AddHint( self.variables["objectives"][ObjectivesEnum.MAKESPAN], int(np.max(solution.schedule[:, 1])), )
[docs] def init_model( self, objectives: Optional[list[ObjectivesEnum]] = None, **args: Any ) -> None: additional_constraints: AdditionalCPConstraints = args.get( "additional_constraints", None ) if objectives is None: objectives = [ObjectivesEnum.NB_TEAMS] args = self.complete_with_default_hyperparameters(args) add_lower_bound = args["add_lower_bound"] optional_activities = args["optional_activities"] adding_redundant_cumulative = args["adding_redundant_cumulative"] self.cp_model = cp_model.CpModel() starts_var = {} ends_var = {} is_present_var = {} interval_var = {} actually_done_var = {} if optional_activities: actually_done_var = {} opt_interval_var = {} st_lb = [ ( int(self.problem.get_lb_start_window(t)), int(self.problem.get_ub_start_window(t)), int(self.problem.get_lb_end_window(t)), int(self.problem.get_ub_end_window(t)), ) for t in self.problem.tasks_list ] dur = [ self.problem.tasks_data[t].duration_task for t in self.problem.tasks_list ] key_per_team = {j: [] for j in self.problem.index_to_team} compatible_teams: dict[ int, set[int] ] = self.problem.compatible_teams_index_all_activity() if additional_constraints is not None: forced_alloc = additional_constraints.forced_allocation if forced_alloc is not None: for i in forced_alloc: if forced_alloc[i] is not None: compatible_teams[i] = {forced_alloc[i]} for i in range(self.problem.number_tasks): starts_var[i] = self.cp_model.NewIntVar( lb=st_lb[i][0], ub=st_lb[i][1], name=f"start_{i}" ) ends_var[i] = self.cp_model.NewIntVar( lb=st_lb[i][2], ub=st_lb[i][3], name=f"end_{i}" ) interval_var[i] = self.cp_model.NewIntervalVar( start=starts_var[i], end=ends_var[i], size=dur[i], name=f"interval_{i}" ) if optional_activities: actually_done_var[i] = self.cp_model.NewBoolVar(name=f"done_{i}") is_present_var[i] = {} opt_interval_var[i] = {} for index_team in compatible_teams[i]: # same as "allocation_binary" variable in the allocation problem is_present_var[i][index_team] = self.cp_model.NewBoolVar( name=f"alloc_{i}_{index_team}" ) opt_interval_var[i][index_team] = self.cp_model.NewOptionalIntervalVar( start=starts_var[i], end=ends_var[i], size=dur[i], is_present=is_present_var[i][index_team], name=f"interval_{i}_{index_team}", ) key_per_team[index_team].append((i, index_team)) if optional_activities: self.cp_model.Add( sum([is_present_var[i][x] for x in is_present_var[i]]) == 1 ).OnlyEnforceIf(actually_done_var[i]) self.cp_model.Add( sum([is_present_var[i][x] for x in is_present_var[i]]) == 0 ).OnlyEnforceIf(actually_done_var[i].Not()) else: self.cp_model.AddExactlyOne( [is_present_var[i][x] for x in is_present_var[i]] ) # Precedence constraints for t in self.problem.precedence_constraints: i_t = self.problem.tasks_to_index[t] for t_suc in self.problem.precedence_constraints[t]: i_t_suc = self.problem.tasks_to_index[t_suc] self.cp_model.Add(starts_var[i_t_suc] >= ends_var[i_t]) # Same allocation constraints for l_t in self.problem.same_allocation: indexes = [self.problem.tasks_to_index[tt] for tt in l_t] common_teams = reduce( lambda x, y: x.intersection(set(is_present_var[y].keys())), indexes, set(self.problem.index_to_team), ) for c in common_teams: self.cp_model.AddAllowedAssignments( [is_present_var[ind][c] for ind in indexes], [ tuple([1] * len(indexes)), tuple([0] * len(indexes)), ], ) # Redundant for c in common_teams: for i in range(len(indexes) - 1): self.cp_model.Add( is_present_var[indexes[i]][c] == is_present_var[indexes[i + 1]][c] ) # Overlap constraints for index_team in key_per_team: unavailable = self.problem.compute_unavailability_calendar( self.problem.index_to_team[index_team] ) fake_tasks_unavailable = [ self.cp_model.NewFixedSizeIntervalVar( start=x[0], size=x[1] - x[0], name="" ) for x in unavailable ] tasks_team = [ opt_interval_var[x[0]][x[1]] for x in key_per_team[index_team] ] if len(fake_tasks_unavailable) + len(tasks_team) > 0: self.cp_model.AddCumulative( tasks_team + fake_tasks_unavailable, [1] * (len(tasks_team) + len(fake_tasks_unavailable)), 1, ) if len(tasks_team) > 0: if additional_constraints is not None: if ( additional_constraints.adding_margin_on_sequence[0] and additional_constraints.adding_margin_on_sequence[1] > 0 ): margin = additional_constraints.adding_margin_on_sequence[1] # create just additional interval for the "routing" constraint. intervals = [ self.cp_model.NewOptionalFixedSizeIntervalVar( start=starts_var[x[0]], size=dur[x[0]] + margin, is_present=is_present_var[x[0]][x[1]], name=f"dummy_longer_task_{x[0],x[1]}", ) for x in key_per_team[index_team] ] self.cp_model.AddCumulative( intervals=intervals, demands=[1] * len(intervals), capacity=1, ) # Resource constraints if self.problem.resources_list is not None: for resource in self.problem.resources_list: capa = self.problem.resources_capacity[resource] interval_cons = [ ( interval_var[self.problem.tasks_to_index[t]], self.problem.tasks_data[t].resource_consumption.get( resource, 0 ), ) for t in self.problem.tasks_data if self.problem.tasks_data[t].resource_consumption.get(resource, 0) > 0 ] if len(interval_cons) > 0: if capa == 1 and all(x[1] == 1 for x in interval_cons): self.cp_model.AddNoOverlap([x[0] for x in interval_cons]) else: self.cp_model.AddCumulative( intervals=[x[0] for x in interval_cons], demands=[x[1] for x in interval_cons], capacity=capa, ) self.variables = { "starts_var": starts_var, "ends_var": ends_var, "is_present_var": is_present_var, "objectives": {}, "key_per_team": key_per_team, } if optional_activities: self.variables["actually_done"] = actually_done_var # Objectives definitions if ObjectivesEnum.DELTA_TO_EXISTING_SOLUTION in objectives: assert "base_solution" in args sol: AllocSchedulingSolution = args["base_solution"] problem = sol.problem self.create_delta_objectives( base_solution=sol, base_problem=problem, additional_constraints=additional_constraints, ) if ObjectivesEnum.MAKESPAN in objectives: makespan = self.create_makespan_obj(ends_var, st_lb) self.variables["objectives"][ObjectivesEnum.MAKESPAN] = makespan if ObjectivesEnum.NB_DONE_AC in objectives and optional_activities: nb_done = sum([actually_done_var[i] for i in actually_done_var]) self.variables["objectives"][ObjectivesEnum.NB_DONE_AC] = -nb_done used = self.create_used_variable(is_present_var, key_per_team) self.variables["used"] = used if args["symmbreak_on_used"]: equivalent_ = compute_equivalent_teams_scheduling_problem(self.problem) for group in equivalent_: for ind1, ind2 in zip(group[:-1], group[1:]): self.cp_model.AddImplication(used[ind2], used[ind1]) self.cp_model.Add(used[ind1] >= used[ind2]) self.variables["objectives"][ObjectivesEnum.NB_TEAMS] = sum( [used[x] for x in used] ) if adding_redundant_cumulative: capacity = self.cp_model.NewIntVar( lb=1, ub=self.problem.number_teams, name="capacity" ) self.cp_model.Add(capacity == sum([used[x] for x in used])) self.cp_model.AddCumulative( intervals=[interval_var[x] for x in interval_var], demands=[1 for x in interval_var], capacity=capacity, ) if add_lower_bound: lprovider: SubBrick = args["lower_bound_method"] t_deb = time.perf_counter() lbound_provider: BaseAllocSchedulingLowerBoundProvider = lprovider.cls( self.problem ) bound = lbound_provider.get_lb_nb_teams(**lprovider.kwargs) t_end = time.perf_counter() self.cp_model.Add(sum([used[x] for x in used]) >= bound) self.time_bounds = t_end - t_deb self.bound_teams = bound self.status_bound = lbound_provider.status else: self.bound_teams = None self.time_bounds = 0 self.status_bound = None self.add_objective_functions_on_cumul(objectives=objectives, **args) if additional_constraints is not None: self.set_additional_constraints( additional_constraint=additional_constraints ) objs = [] weights = [] for obj in objectives: if obj == ObjectivesEnum.NB_DONE_AC and optional_activities: objs.append(self.variables["objectives"][ObjectivesEnum.NB_DONE_AC]) weights.append(100000.0) elif obj == ObjectivesEnum.NB_TEAMS: objs.append(self.variables["objectives"][ObjectivesEnum.NB_TEAMS]) weights.append(10000.0) elif obj == ObjectivesEnum.MIN_WORKLOAD: objs.append(self.variables["objectives"][ObjectivesEnum.MIN_WORKLOAD]) weights.append(1.0) elif obj == ObjectivesEnum.DISPERSION: objs.append(self.variables["objectives"][ObjectivesEnum.DISPERSION]) weights.append(1.0) elif obj == ObjectivesEnum.MAKESPAN: objs.append(self.variables["objectives"][ObjectivesEnum.MAKESPAN]) weights.append(1.0) elif obj == ObjectivesEnum.DELTA_TO_EXISTING_SOLUTION: weights_dict = { "reallocated": 1000, "sum_delta_schedule": 1, "max_delta_schedule": 0, "nb_shifted": 1, } for x in weights_dict: objs.append(self.variables["resched_objs"][x]) weights.append(weights_dict[x]) self.variables["objectives"][x] = self.variables["resched_objs"][x] self.variables["objectives"][ ObjectivesEnum.DELTA_TO_EXISTING_SOLUTION ] = sum( [ weights_dict[x] * self.variables["resched_objs"][x] for x in weights_dict ] ) self.variables["objs"] = objs self.cp_model.Minimize(sum(weights[i] * objs[i] for i in range(len(objs))))
[docs] def add_objective_functions_on_cumul( self, objectives: Optional[list[ObjectivesEnum]] = None, **args ): modelisation_dispersion: ModelisationDispersion = args[ "modelisation_dispersion" ] dur = [ self.problem.tasks_data[t].duration_task for t in self.problem.tasks_list ] if ObjectivesEnum.DISPERSION in objectives: dict_fairness = model_fairness( used_team=self.variables["used"], allocation_variables=[ self.variables["is_present_var"][i] for i in range(self.problem.number_tasks) ], value_per_task=dur, modelisation_dispersion=modelisation_dispersion, cp_model=self.cp_model, number_teams=self.problem.number_teams, name_value="workload", ) self.variables["cumul_workload"] = dict_fairness["cumulated_value"] if "min_value_workload" in dict_fairness: self.variables["min_workload"] = dict_fairness["min_value_workload"] if "max_value_workload" in dict_fairness: self.variables["max_workload"] = dict_fairness["max_value_workload"] self.variables["objectives"][ObjectivesEnum.DISPERSION] = dict_fairness[ "obj" ] if ObjectivesEnum.MIN_WORKLOAD in objectives: variables = cumulate_value_per_teams_version_2( used_team=self.variables["used"], allocation_variables=[ self.variables["is_present_var"][i] for i in range(self.problem.number_tasks) ], value_per_task=dur, cp_model=self.cp_model, number_teams=self.problem.number_teams, name_value="workload_", ) min_value = self.cp_model.NewIntVar( lb=0, ub=sum(dur), name="min_value_workload" ) self.cp_model.AddMinEquality(min_value, variables["workload_per_team_nz"]) self.variables["objectives"][ObjectivesEnum.MIN_WORKLOAD] = min_value
[docs] def set_additional_constraints( self, additional_constraint: AdditionalCPConstraints ): self.set_nb_teams_constraints(additional_constraint=additional_constraint) self.set_team_used_constraint(additional_constraint=additional_constraint)
[docs] def set_nb_teams_constraints(self, additional_constraint: AdditionalCPConstraints): if additional_constraint.nb_teams_bounds is not None: used = self.variables["used"] if ( additional_constraint.nb_teams_bounds[0] is not None and additional_constraint.nb_teams_bounds[1] == additional_constraint.nb_teams_bounds[0] ): self.cp_model.Add( sum([used[x] for x in used]) == additional_constraint.nb_teams_bounds[0] ) else: if additional_constraint.nb_teams_bounds[0] is not None: self.cp_model.Add( sum([used[x] for x in used]) >= additional_constraint.nb_teams_bounds[0] ) if additional_constraint.nb_teams_bounds[1] is not None: self.cp_model.Add( sum([used[x] for x in used]) <= additional_constraint.nb_teams_bounds[1] )
[docs] def set_team_used_constraint(self, additional_constraint: AdditionalCPConstraints): if additional_constraint.team_used_constraint is not None: used = self.variables["used"] for team_index in additional_constraint.team_used_constraint: if additional_constraint.team_used_constraint[team_index] is not None: # don't care for the syntax warning, i want that it only works with booleans if additional_constraint.team_used_constraint[team_index] == True: self.cp_model.Add(used[team_index] == 1) if additional_constraint.team_used_constraint[team_index] == False: self.cp_model.Add(used[team_index] == 0)
[docs] def create_delta_objectives( self, base_solution: AllocSchedulingSolution, base_problem: AllocSchedulingProblem, additional_constraints: Optional[AdditionalCPConstraints] = None, ): objs = [] common_tasks = list( set(base_problem.tasks_list).intersection(self.problem.tasks_list) ) common_teams = list( set(base_problem.team_names).intersection(self.problem.team_names) ) len_common_tasks = len(common_tasks) reallocation_bool = [ self.cp_model.NewBoolVar(name=f"realloc_{self.problem.tasks_to_index[t]}") for t in common_tasks ] self.variables["reallocation"] = reallocation_bool self.variables["tasks_order_in_reallocation"] = common_tasks delta_starts = [] delta_starts_abs = [] is_shifted = [ self.cp_model.NewBoolVar(name=f"shifted_{self.problem.tasks_to_index[t]}") for t in common_tasks ] self.variables["is_shifted"] = is_shifted for i in range(len_common_tasks): tt = common_tasks[i] index_in_problem = self.problem.tasks_to_index[tt] ignore_reallocation = False if additional_constraints is not None: if additional_constraints.set_tasks_ignore_reallocation is not None: if ( index_in_problem in additional_constraints.set_tasks_ignore_reallocation ): ignore_reallocation = True index_in_base_problem = base_problem.tasks_to_index[tt] if ( base_solution.allocation[index_in_base_problem] not in base_problem.index_to_team ): team_of_base_solution = None else: team_of_base_solution = base_problem.index_to_team[ base_solution.allocation[index_in_base_problem] ] if team_of_base_solution is not None: index_team = self.problem.teams_to_index[team_of_base_solution] if not ignore_reallocation: if ( index_team not in self.variables["is_present_var"][index_in_problem] ): logger.debug("Problem") else: self.cp_model.Add( self.variables["is_present_var"][index_in_problem][ index_team ] == 1 ).OnlyEnforceIf(reallocation_bool[i].Not()) self.cp_model.Add( self.variables["is_present_var"][index_in_problem][ index_team ] == 0 ).OnlyEnforceIf(reallocation_bool[i]) else: self.cp_model.Add(reallocation_bool[i] == 0) delta_starts.append( -int(base_solution.schedule[index_in_base_problem, 0]) + self.variables["starts_var"][index_in_problem] ) self.cp_model.Add(delta_starts[-1] != 0).OnlyEnforceIf(is_shifted[i]) self.cp_model.Add(delta_starts[-1] == 0).OnlyEnforceIf(is_shifted[i].Not()) delta_starts_abs.append( self.cp_model.NewIntVar( lb=0, ub=self.problem.horizon, name=f"delta_abs_starts_{index_in_problem}", ) ) self.cp_model.AddAbsEquality(delta_starts_abs[-1], delta_starts[-1]) self.variables["delta_starts_abs"] = delta_starts_abs self.variables["delta_starts"] = delta_starts max_delta_start = self.cp_model.NewIntVar( lb=0, ub=self.problem.horizon, name=f"max_delta_starts" ) self.variables["max_delta_start"] = max_delta_start self.cp_model.AddMaxEquality(max_delta_start, delta_starts_abs) objs = [ sum(reallocation_bool) ] # count the number of changes of team/task allocation objs += [sum(delta_starts_abs)] # sum all absolute shift on the schedule objs += [max_delta_start] # maximum of absolute shift over all tasks objs += [ sum(is_shifted) ] # Number of task that shifted at least by 1 unit of time. self.variables["resched_objs"] = { "reallocated": objs[0], "sum_delta_schedule": objs[1], "max_delta_schedule": objs[2], "nb_shifted": objs[3], } return objs
[docs] def create_used_variable( self, is_present_var: dict[int, dict[int, IntVar]], key_per_team: dict[int, list[tuple[int, int]]], ): used = { index_team: self.cp_model.NewBoolVar(f"used_{index_team}") for index_team in key_per_team } for index_team in used: for x in key_per_team[index_team]: self.cp_model.Add(used[index_team] >= is_present_var[x[0]][x[1]]) self.cp_model.AddMaxEquality( used[index_team], [is_present_var[x[0]][x[1]] for x in key_per_team[index_team]], ) return used
[docs] def create_makespan_obj( self, ends_var: dict[int, IntVar], st_lb: list[tuple[int, int, int, int]] = None ): if st_lb is None: st_lb = [ ( int(self.problem.get_lb_start_window(t)), int(self.problem.get_ub_start_window(t)), int(self.problem.get_lb_end_window(t)), int(self.problem.get_ub_end_window(t)), ) for t in self.problem.tasks_list ] lb_makespan = max([x[2] for x in st_lb]) ub_makespan = max([x[3] for x in st_lb]) makespan = self.cp_model.NewIntVar( lb=lb_makespan, ub=ub_makespan, name="makespan" ) self.cp_model.AddMaxEquality(makespan, [ends_var[i] for i in ends_var]) return makespan
def _get_variables_obj_key( obj: Union[str, ObjectivesEnum] ) -> Union[str, ObjectivesEnum]: if isinstance(obj, str): try: return ObjectivesEnum[obj] except KeyError: return obj else: return obj