Source code for discrete_optimization.workforce.commons.fairness_modeling_ortools

#  Copyright (c) 2025 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
from typing import Optional, Union

from ortools.sat.python.cp_model import CpModel, IntVar

from discrete_optimization.workforce.commons.fairness_modeling import (
    ModelisationDispersion,
)


[docs] def cumulate_value_per_teams( used_team: Union[dict[int, IntVar], list[IntVar]], allocation_variables: Union[list[list[IntVar]], list[dict[int, IntVar]]], value_per_task: list[int], cp_model: CpModel, number_teams: Optional[int] = None, name_value: Optional[str] = "", ): """ - used_team : - either a dict : {index_team: boolean_var} for index_team in range(nb_teams) - or simply an array of size (nb_teams) - allocation_variables : - either a 2D array of vars : [index_activity, index_team] or a list of size (nb_activities) of dict : {index_team: boolean} for index_team in "available_teams for a given task" - value per task : - list of values linked to task, to aggregate for a given team. """ if number_teams is None: number_teams = len(used_team) upper_bound_values = int(sum(value_per_task)) workload_per_team = [ cp_model.NewIntVar( lb=0, ub=upper_bound_values, name=f"cumulated_value_{name_value}_{i}" ) for i in range(number_teams) ] for index_team in range(number_teams): team_load = 0 if isinstance(allocation_variables[0], dict): team_load = sum( [ allocation_variables[i][index_team] * value_per_task[i] for i in range(len(allocation_variables)) if index_team in allocation_variables[i] ] ) if isinstance(allocation_variables[0], list): team_load = sum( [ allocation_variables[i][index_team] * value_per_task[i] for i in range(len(allocation_variables)) ] ) cp_model.Add(workload_per_team[index_team] == team_load).OnlyEnforceIf( used_team[index_team] ) return {"workload_per_team": workload_per_team}
[docs] def cumulate_value_per_teams_version_2( used_team: Union[dict[int, IntVar], list[IntVar]], allocation_variables: Union[list[list[IntVar]], list[dict[int, IntVar]]], value_per_task: list[int], cp_model: CpModel, number_teams: Optional[int] = None, name_value: Optional[str] = "", ): """ - used_team : - either a dict : {index_team: boolean_var} for index_team in range(nb_teams) - or simply an array of size (nb_teams) - allocation_variables : - either a 2D array of vars : [index_activity, index_team] or a list of size (nb_activities) of dict : {index_team: boolean} for index_team in "available_teams for a given task" - value per task : - list of values linked to task, to aggregate for a given team. """ if number_teams is None: number_teams = len(used_team) upper_bound_values = int(sum(value_per_task)) workload_per_team = [ cp_model.NewIntVar( lb=0, ub=upper_bound_values, name=f"cumulated_value_{name_value}_{i}" ) for i in range(number_teams) ] workload_per_team_non_zeros = [ cp_model.NewIntVar( lb=0, ub=upper_bound_values, name=f"cumulated_value_nz_{name_value}_{i}" ) for i in range(number_teams) ] for index_team in range(number_teams): team_load = 0 if isinstance(allocation_variables[0], dict): team_load = sum( [ allocation_variables[i][index_team] * value_per_task[i] for i in range(len(allocation_variables)) if index_team in allocation_variables[i] ] ) if isinstance(allocation_variables[0], list): team_load = sum( [ allocation_variables[i][index_team] * value_per_task[i] for i in range(len(allocation_variables)) ] ) cp_model.Add(team_load == workload_per_team[index_team]) cp_model.Add( workload_per_team_non_zeros[index_team] == team_load ).OnlyEnforceIf(used_team[index_team]) cp_model.Add( workload_per_team_non_zeros[index_team] == upper_bound_values ).OnlyEnforceIf(used_team[index_team].Not()) return { "workload_per_team": workload_per_team, "workload_per_team_nz": workload_per_team_non_zeros, }
[docs] def define_fairness_criteria_from_cumulated_value( used_team: Union[dict[int, IntVar], list[IntVar]], cumulated_value_per_team: list[IntVar], value_per_task: list[int], modelisation_dispersion: ModelisationDispersion, cp_model: CpModel, cumulated_value_per_team_nz: Optional[list[IntVar]] = None, name_value: Optional[str] = "", ): if ( modelisation_dispersion == ModelisationDispersion.EXACT_MODELING_WITH_IMPLICATION ): upper_bound = sum(value_per_task) max_value = cp_model.NewIntVar( lb=0, # upper_bound//len(used_team), ub=upper_bound, name=f"max_value_{name_value}", ) min_value = cp_model.NewIntVar( lb=0, # upper_bound//len(used_team), ub=upper_bound, name=f"min_value_{name_value}", ) cp_model.AddMaxEquality(max_value, cumulated_value_per_team) cp_model.AddMinEquality(min_value, cumulated_value_per_team) return { "obj": max_value - min_value, f"max_value_{name_value}": max_value, f"min_value_{name_value}": min_value, } elif ( modelisation_dispersion == ModelisationDispersion.EXACT_MODELING_DUPLICATED_VARS ): upper_bound = sum(value_per_task) max_value = cp_model.NewIntVar( lb=0, # upper_bound // len(used_team), ub=upper_bound, name=f"max_value_{name_value}", ) min_value = cp_model.NewIntVar( lb=0, # upper_bound // len(used_team), ub=upper_bound, name=f"min_value_{name_value}", ) cp_model.AddMaxEquality(max_value, cumulated_value_per_team) cp_model.AddMinEquality(min_value, cumulated_value_per_team_nz) return { "obj": max_value - min_value, f"max_value_{name_value}": max_value, f"min_value_{name_value}": min_value, } elif modelisation_dispersion == ModelisationDispersion.MAX_DIFF: upper_bound = sum(value_per_task) max_diff = cp_model.NewIntVar( lb=0, ub=upper_bound, name=f"max_diff_{name_value}" ) cp_model.AddMaxEquality( max_diff, [x - y for x in cumulated_value_per_team for y in cumulated_value_per_team], ) return {"obj": max_diff, f"max_diff_{name_value}": max_diff, "constraints": []} elif modelisation_dispersion == ModelisationDispersion.PROXY_MAX_MIN: upper_bound = sum(value_per_task) max_value = cp_model.NewIntVar( lb=0, # upper_bound // len(used_team), ub=upper_bound, name=f"max_value_{name_value}", ) cp_model.AddMaxEquality(max_value, cumulated_value_per_team) return {"obj": max_value, f"max_value_{name_value}": max_value} elif modelisation_dispersion == ModelisationDispersion.PROXY_MIN_MAX: upper_bound = sum(value_per_task) min_value = cp_model.NewIntVar( lb=0, # upper_bound // len(used_team), ub=upper_bound, name=f"min_value_{name_value}", ) cp_model.AddMinEquality(min_value, cumulated_value_per_team) return {"obj": -min_value, f"min_value_{name_value}": min_value} elif modelisation_dispersion == ModelisationDispersion.PROXY_SUM: upper_bound = sum(value_per_task) abs_deltas = [ { j: cp_model.NewIntVar(lb=0, ub=upper_bound, name=f"delta_{i}_{j}") for j in range(i + 1, len(cumulated_value_per_team)) } for i in range(len(cumulated_value_per_team)) ] for i in range(len(abs_deltas)): for j in abs_deltas[i]: cp_model.AddAbsEquality( abs_deltas[i][j], cumulated_value_per_team[i] - cumulated_value_per_team[j], ) return { "obj": sum( [ abs_deltas[i][j] for i in range(len(abs_deltas)) for j in abs_deltas[i] ] ) } elif modelisation_dispersion == ModelisationDispersion.PROXY_SLACK: some_expected_value = cp_model.NewIntVar( lb=0, ub=sum(value_per_task), name=f"expected_value_{name_value}" ) slack = cp_model.NewIntVar( lb=0, ub=sum(value_per_task), name=f"slack_{name_value}" ) constraints = [] for index_team in range(len(cumulated_value_per_team)): ( cp_model.Add( cumulated_value_per_team[index_team] <= some_expected_value + slack ).OnlyEnforceIf(used_team[index_team]) ) ( cp_model.Add( cumulated_value_per_team[index_team] >= some_expected_value - slack ).OnlyEnforceIf(used_team[index_team]) ) return {"obj": slack} else: raise NotImplementedError(f"Method {modelisation_dispersion} unknown")
[docs] def model_fairness( used_team: Union[dict[int, IntVar], list[IntVar]], allocation_variables: Union[list[list[IntVar]], list[dict[int, IntVar]]], value_per_task: list[int], modelisation_dispersion: ModelisationDispersion, cp_model: CpModel, number_teams: Optional[int] = None, name_value: Optional[str] = "", ): if modelisation_dispersion == ModelisationDispersion.EXACT_MODELING_DUPLICATED_VARS: dict_cumulated_variable = cumulate_value_per_teams_version_2( used_team=used_team, allocation_variables=allocation_variables, value_per_task=value_per_task, number_teams=number_teams, cp_model=cp_model, name_value=name_value, ) cumulated_value_per_team = dict_cumulated_variable["workload_per_team"] fairness_dict = define_fairness_criteria_from_cumulated_value( used_team=used_team, cumulated_value_per_team=cumulated_value_per_team, value_per_task=value_per_task, cumulated_value_per_team_nz=dict_cumulated_variable["workload_per_team_nz"], modelisation_dispersion=modelisation_dispersion, cp_model=cp_model, name_value=name_value, ) else: dict_cumulated_variable = cumulate_value_per_teams( used_team=used_team, allocation_variables=allocation_variables, value_per_task=value_per_task, number_teams=number_teams, cp_model=cp_model, name_value=name_value, ) cumulated_value_per_team = dict_cumulated_variable["workload_per_team"] fairness_dict = define_fairness_criteria_from_cumulated_value( used_team=used_team, cumulated_value_per_team=cumulated_value_per_team, value_per_task=value_per_task, modelisation_dispersion=modelisation_dispersion, cp_model=cp_model, name_value=name_value, ) d = fairness_dict d["cumulated_value"] = cumulated_value_per_team return d