# Copyright (c) 2025 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
from collections import defaultdict
from collections.abc import Hashable
from copy import deepcopy
from enum import Enum
from itertools import product
from typing import Any, Optional
import networkx as nx
import numpy as np
from networkx import bipartite
from discrete_optimization.coloring.problem import ColoringConstraints, ColoringProblem
from discrete_optimization.generic_tools.do_problem import (
EncodingRegister,
ModeOptim,
ObjectiveDoc,
ObjectiveHandling,
ObjectiveRegister,
Problem,
Solution,
TypeAttribute,
TypeObjective,
)
from discrete_optimization.generic_tools.graph_api import Graph
logger = logging.getLogger(__file__)
[docs]
def compute_available_teams_per_activities(
starts: np.ndarray,
ends: np.ndarray,
activities_name: list[Hashable],
calendars_team: dict[Hashable, np.ndarray],
):
available_team_per_activity = {}
for i in range(len(starts)):
available_team_per_activity[activities_name[i]] = set()
st, end = starts[i], ends[i]
for team in calendars_team:
if st == end:
if calendars_team[team][int(st)] == 1:
available_team_per_activity[activities_name[i]].add(team)
elif np.min(calendars_team[team][int(st) : int(end)]) == 1:
available_team_per_activity[activities_name[i]].add(team)
return available_team_per_activity
[docs]
class AllocationAdditionalConstraint:
"""
Object to store potentially dynamically defined constraints.
Some can be redundant
"""
# each set of activities should be allocated to same team.
same_allocation: list[set[Hashable]]
# allocated team inside the set should be different : "cliques constraint".
# It could be modeled in the coloring graph
all_diff_allocation: list[set[Hashable]]
# force allocation
# could be modeled in the "allocation" graph
forced_allocation: dict[Hashable, Hashable]
# forbidden allocation
# (could be modeled in the "allocation" graph)
forbidden_allocation: dict[Hashable, set[Hashable]]
# allowed allocation
# (could be modeled in the allocation graph)
allowed_allocation: dict[Hashable, set[Hashable]]
# Disjunction
# each list in disjunction : [(act, team), (act2, team2)...]
# will mean that at least 1 act_i will need to be allocated to team_i
disjunction: list[list[tuple[Hashable, Hashable]]]
# Number of teams to be used : useful for scenario where we limit number of resource
nb_max_teams: int
# Precedence constraints : not necessarily useful for allocation problem
# Except for unsat problems where task can be dropped, then the successors should be dropped too.
# dict = {parent: set of children tasks}
precedences: dict[Hashable, set[Hashable]]
def __init__(
self,
same_allocation: Optional[list[set[Hashable]]] = None,
all_diff_allocation: Optional[list[set[Hashable]]] = None,
forced_allocation: Optional[dict[Hashable, Hashable]] = None,
forbidden_allocation: Optional[dict[Hashable, set[Hashable]]] = None,
allowed_allocation: Optional[dict[Hashable, set[Hashable]]] = None,
disjunction: Optional[list[list[tuple[Hashable, Hashable]]]] = None,
nb_max_teams: Optional[int] = None,
precedences: Optional[dict[Hashable, set[Hashable]]] = None,
):
self.same_allocation = same_allocation
self.all_diff_allocation = all_diff_allocation
self.forced_allocation = forced_allocation
self.forbidden_allocation = forbidden_allocation
self.allowed_allocation = allowed_allocation
self.disjunction = disjunction
self.nb_max_teams = nb_max_teams
self.precedences = precedences
[docs]
def is_empty(self):
return all(
getattr(self, x) is None
for x in [
"same_allocation",
"all_diff_allocation",
"forced_allocation",
"forbidden_allocation",
"allowed_allocation",
"disjunction",
"nb_max_teams",
"precedences",
]
)
def __str__(self):
s = "Additional constraints \n"
for attr in [
"same_allocation",
"all_diff_allocation",
"forced_allocation",
"forbidden_allocation",
"allowed_allocation",
"disjunction",
"nb_max_teams",
"precedences",
]:
s += f"{attr} = {getattr(self, attr)}\n"
return s
[docs]
class GraphBipartite(Graph):
def __init__(
self,
nodes: list[tuple[Hashable, dict[str, Any]]],
edges: list[tuple[Hashable, Hashable, dict[str, Any]]],
nodes_activity: set[Hashable],
nodes_team: set[Hashable],
undirected: bool = True,
compute_predecessors: bool = True,
):
self.nodes_activity = nodes_activity
self.nodes_team = nodes_team
# Sanity check
nodes_post_treated = []
for node in nodes:
dd = node[1]
if node[0] in self.nodes_team:
dd["bipartite"] = 0
if node[0] in self.nodes_activity:
dd["bipartite"] = 1
nodes_post_treated += [(node[0], dd)]
super().__init__(
nodes=nodes_post_treated,
edges=edges,
undirected=undirected,
compute_predecessors=compute_predecessors,
)
assert all(n in self.nodes_infos_dict for n in self.nodes_activity)
assert all(n in self.nodes_infos_dict for n in self.nodes_team)
self.graph_nx = self.to_networkx()
self.nodes_team_list = list(self.nodes_team)
[docs]
def to_networkx(self) -> nx.DiGraph:
graph_nx = nx.DiGraph() if not self.undirected else nx.Graph()
graph_nx.add_nodes_from(self.nodes)
graph_nx.add_edges_from(self.edges)
return graph_nx
[docs]
def is_bipartite(self) -> bool:
return bipartite.is_bipartite(self.graph_nx)
[docs]
def get_nodes_team(self):
return self.nodes_team
[docs]
def get_nodes_team_list(self):
return self.nodes_team_list
[docs]
def get_nodes_activity(self):
return self.nodes_activity
[docs]
class TeamAllocationSolution(Solution):
def __init__(
self,
problem: "TeamAllocationProblem",
allocation: list[Optional[int]],
**kwargs,
):
self.problem = problem
self.allocation = allocation
self.kpis = kwargs
[docs]
def copy(self) -> "Solution":
return TeamAllocationSolution(
problem=self.problem, allocation=deepcopy(self.allocation), **self.kpis
)
[docs]
def lazy_copy(self) -> "Solution":
return TeamAllocationSolution(
problem=self.problem, allocation=self.allocation, **self.kpis
)
[docs]
def change_problem(self, new_problem: "Problem") -> None:
self.problem = new_problem
[docs]
def build_graph_allocation_from_calendar_and_schedule(
starts: np.ndarray,
ends: np.ndarray,
calendar_team: dict[Hashable, list[tuple[int, int]]],
horizon: int,
tasks_name: list[Hashable],
teams_name: list[Hashable],
):
calendars_array = {}
for team in calendar_team:
calendars_array[team] = np.zeros(horizon)
for slot in calendar_team[team]:
calendars_array[team][
slot[0] : min(slot[1], calendars_array[team].shape[0])
] = 1
available_team_per_activity = compute_available_teams_per_activities(
starts=starts,
ends=ends,
calendars_team=calendars_array,
activities_name=tasks_name,
)
nodes = [(ac, {"type": "activity"}) for ac in tasks_name]
nodes_activity = set({ac for ac in tasks_name})
nodes.extend([(team, {"type": "team"}) for team in teams_name])
nodes_team = set({team for team in teams_name})
edges = []
teams = teams_name
for ac in available_team_per_activity:
for team in teams:
if team not in available_team_per_activity[ac]:
edges.append((ac, team, {}))
graph_allocation = GraphBipartite(
nodes=nodes,
edges=edges,
nodes_activity=nodes_activity,
nodes_team=nodes_team,
undirected=True,
compute_predecessors=False,
)
return graph_allocation
[docs]
def compute_graph_coloring(starts: np.ndarray, ends: np.ndarray, task_names: list):
from discrete_optimization.generic_tools.graph_api import Graph, from_networkx
track_presence = defaultdict(lambda: set())
edges = []
graph_nx = nx.DiGraph()
for i in range(len(task_names)):
graph_nx.add_node(
task_names[i], start=starts[i], end=ends[i], duration=ends[i] - starts[i]
)
for i in range(starts.shape[0]):
for time in range(int(starts[i]), int(ends[i])):
edges.extend([(task_names[x], task_names[i]) for x in track_presence[time]])
track_presence[time].add(i)
edges = set(edges)
graph_nx.remove_edges_from(graph_nx.edges())
graph_nx.add_edges_from(list(edges))
return from_networkx(graph_nx=graph_nx)
[docs]
class TeamAllocationProblem(Problem):
def __init__(
self,
graph_activity: Graph = None,
graph_allocation: GraphBipartite = None,
allocation_additional_constraint: Optional[
AllocationAdditionalConstraint
] = None,
schedule_activity: Optional[dict[Hashable, tuple[int, int]]] = None,
calendar_team: dict[Hashable, list[tuple[int, int]]] = None,
activities_name: list[Hashable] = None,
):
"""
:param graph_activity: graph representing coloring constraint among the activities
:param graph_allocation: graph representing forbidden task assignment to team (or the opposite)
:param allocation_additional_constraint: optional additional constraint objects.
:param schedule_activity: (optional), if the underlying problem is a workforce allocation problem to some
scheduled-activities, it is possible to specify the schedule this way. Notably, this can lead to efficient
clique-overlap kind of constraint for some solver.
:param calendar_team: calendar of the teams.
"""
self.graph_activity = graph_activity
self.graph_allocation = graph_allocation
if self.graph_activity is not None:
self.activities_name: list[Hashable] = self.graph_activity.nodes_name
if activities_name is not None:
self.activities_name = activities_name
self.calendar_team = calendar_team
if self.graph_allocation is None and self.calendar_team is not None:
orig_starts_arr = np.array(
[schedule_activity[n][0] for n in self.activities_name]
)
orig_ends_arr = np.array(
[schedule_activity[n][1] for n in self.activities_name]
)
self.graph_allocation = build_graph_allocation_from_calendar_and_schedule(
starts=orig_starts_arr,
ends=orig_ends_arr,
calendar_team=self.calendar_team,
horizon=int(np.max(orig_ends_arr) + 10),
tasks_name=self.activities_name,
teams_name=list(self.calendar_team.keys()),
)
if self.graph_activity is None:
orig_starts_arr = np.array(
[schedule_activity[n][0] for n in self.activities_name]
)
orig_ends_arr = np.array(
[schedule_activity[n][1] for n in self.activities_name]
)
self.graph_activity = compute_graph_coloring(
starts=orig_starts_arr,
ends=orig_ends_arr,
task_names=self.activities_name,
)
self.number_of_activity = len(self.graph_activity.nodes_name)
self.teams_name: list[Hashable] = self.graph_allocation.get_nodes_team_list()
self.number_of_teams = len(self.teams_name)
self.allocation_additional_constraint = allocation_additional_constraint
self.schedule = schedule_activity
if self.schedule is None:
self.schedule = {
self.activities_name[i]: (
self.graph_activity.nodes_infos_dict[self.activities_name[i]][
"start"
],
self.graph_activity.nodes_infos_dict[self.activities_name[i]][
"end"
],
)
for i in range(self.number_of_activity)
}
self.index_activities_name = {
self.activities_name[i]: i for i in range(self.number_of_activity)
}
self.index_to_activities_name = {
i: self.activities_name[i] for i in range(self.number_of_activity)
}
self.index_teams_name = {
self.teams_name[i]: i for i in range(self.number_of_teams)
}
self.index_to_teams_name = {
i: self.teams_name[i] for i in range(self.number_of_teams)
}
[docs]
def reorder_teams_name(self, new_order_teams_name: list[Hashable]):
self.teams_name = new_order_teams_name
self.index_teams_name = {
self.teams_name[i]: i for i in range(self.number_of_teams)
}
self.index_to_teams_name = {
i: self.teams_name[i] for i in range(self.number_of_teams)
}
@property
def do_add_cons(self):
return (
self.allocation_additional_constraint is not None
and not self.allocation_additional_constraint.is_empty()
)
[docs]
def computed_forbidden_team_for_task(self, task: Hashable) -> list[Hashable]:
return [team for team in self.graph_allocation.get_neighbors(task)]
[docs]
def compute_allowed_team_for_task(self, task: Hashable) -> list[Hashable]:
if self.allocation_additional_constraint is not None:
allowed_team = [
team
for team in self.index_teams_name
if team not in self.graph_allocation.get_neighbors(task)
]
if self.allocation_additional_constraint.forced_allocation is not None:
if task in self.allocation_additional_constraint.forced_allocation:
allowed_team = [
self.allocation_additional_constraint.forced_allocation[task]
]
if self.allocation_additional_constraint.forbidden_allocation is not None:
if task in self.allocation_additional_constraint.forbidden_allocation:
allowed_team = [
t
for t in allowed_team
if t
not in self.allocation_additional_constraint.forbidden_allocation[
task
]
]
if self.allocation_additional_constraint.allowed_allocation:
if task in self.allocation_additional_constraint.allowed_allocation:
allowed_team = [
t
for t in allowed_team
if t
in self.allocation_additional_constraint.allowed_allocation[
task
]
]
return allowed_team
return [
team
for team in self.index_teams_name
if team not in self.graph_allocation.get_neighbors(task)
]
[docs]
def compute_forbidden_team_index_for_task(self, task: Hashable) -> list[int]:
return [
self.index_teams_name[team]
for team in self.computed_forbidden_team_for_task(task)
]
[docs]
def compute_allowed_team_index_for_task(self, task: Hashable) -> list[int]:
return [
self.index_teams_name[team]
for team in self.compute_allowed_team_for_task(task)
]
[docs]
def compute_forbidden_team_index_all_task(self) -> list[list[int]]:
return [
self.compute_forbidden_team_index_for_task(self.index_to_activities_name[i])
for i in range(self.number_of_activity)
]
[docs]
def compute_allowed_team_index_all_task(self) -> list[list[int]]:
return [
self.compute_allowed_team_index_for_task(self.index_to_activities_name[i])
for i in range(self.number_of_activity)
]
[docs]
def compute_pair_overlap_index_task(self) -> list[tuple[int, int]]:
return [
(self.index_activities_name[e[0]], self.index_activities_name[e[1]])
for e in self.graph_activity.edges
]
[docs]
def get_max_teams(self) -> int:
max_teams = self.number_of_teams
if self.allocation_additional_constraint is not None:
if self.allocation_additional_constraint.nb_max_teams is not None:
max_teams = min(
max_teams, self.allocation_additional_constraint.nb_max_teams
)
return max_teams
[docs]
def evaluate(self, variable: TeamAllocationSolution) -> dict[str, float]:
"""
Evaluation implementation for TeamAllocationProblem.
Compute number of allocated teams and violation of the current solution.
"""
keys = ["nb_teams", "nb_violations"]
if variable.kpis.get("nb_teams", None) is None:
if variable.allocation is None:
raise ValueError("variable.allocation must not be None when evaluating")
else:
variable.kpis["nb_teams"] = len(set(variable.allocation))
if variable.kpis.get("nb_violations", None) is None:
if variable.allocation is None:
raise ValueError("variable.allocation must not be None when evaluating")
else:
variable.kpis[
"nb_violations"
] = self.count_allowed_assignment_violations(
variable
) + self.count_color_constraints_violations(
variable
)
if self.do_add_cons:
keys.append("nb_violations_add_cons")
if variable.kpis.get("nb_violations_add_cons", None) is None:
if variable.allocation is None:
raise ValueError(
"variable.allocation must not be None when evaluating"
)
# TODO : count the number of violated constraint instead of this.
if not satisfy_additional_constraint(
problem=self,
solution=variable,
additional_constraint=self.allocation_additional_constraint,
partial_solution=False,
):
variable.kpis["nb_violations_add_cons"] = 10
else:
variable.kpis["nb_violations_add_cons"] = 0
return {k: variable.kpis[k] for k in keys}
[docs]
def satisfy(self, variable: TeamAllocationSolution) -> bool:
"""Check the constraint of the solution.
Check for each edges in the graph if the allocated team of the vertices are different.
When one counterexample is found, the function directly returns False.
Args:
variable (TeamAllocationSolution): the solution object we want to check the feasibility
Returns: boolean indicating if the solution fulfills the constraint.
"""
if any(x is None for x in variable.allocation):
return False
b = self.satisfy_color_constraints(variable)
if not b:
return b
b = self.satisfy_allowed_assignment(variable)
if not b:
return b
if self.do_add_cons:
b = satisfy_additional_constraint(
problem=self,
solution=variable,
additional_constraint=self.allocation_additional_constraint,
partial_solution=False,
)
if not b:
return b
return True
[docs]
def satisfy_color_constraints(self, variable: TeamAllocationSolution) -> bool:
if len(self.graph_activity.edges) > 0:
if variable.allocation is None:
raise ValueError("variable.allocation must not be None")
for e in self.graph_activity.edges:
if (
variable.allocation[self.index_activities_name[e[0]]]
== variable.allocation[self.index_activities_name[e[1]]]
):
return False
return True
[docs]
def satisfy_allowed_assignment(self, variable: TeamAllocationSolution) -> bool:
if len(self.graph_allocation.edges) > 0:
if variable.allocation is None:
raise ValueError("variable.allocation must not be None")
for e in self.graph_allocation.edges:
if e[0] in self.index_activities_name:
if (
variable.allocation[self.index_activities_name[e[0]]]
== self.index_teams_name[e[1]]
):
return False
if e[1] in self.index_activities_name:
if (
variable.allocation[self.index_activities_name[e[1]]]
== self.index_teams_name[e[0]]
):
return False
return True
[docs]
def get_attribute_register(self) -> EncodingRegister:
"""Attribute documentation for TeamAllocation object.
Returns: an EncodingRegister specifying the colors attribute.
"""
dict_register = {
"allocation": {
"name": "allocation",
"type": [TypeAttribute.LIST_INTEGER],
"n": self.number_of_activity,
"arity": self.number_of_teams,
"low": 0, # integer
"up": self.number_of_teams - 1, # integer
}
}
return EncodingRegister(dict_register)
[docs]
def get_solution_type(self) -> type[Solution]:
"""Returns the class of a solution instance for ColoringProblem."""
return TeamAllocationSolution
[docs]
def get_objective_register(self) -> ObjectiveRegister:
"""Specifies the default objective settings to be used with the evaluate function output."""
dict_objective = {
"nb_teams": ObjectiveDoc(type=TypeObjective.OBJECTIVE, default_weight=-1.0),
"nb_violations": ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-100.0
),
}
if self.do_add_cons:
dict_objective["nb_violations_add_cons"] = ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-100.0
)
return ObjectiveRegister(
objective_sense=ModeOptim.MAXIMIZATION,
objective_handling=ObjectiveHandling.AGGREGATE,
dict_objective_to_doc=dict_objective,
)
[docs]
def get_dummy_solution(self) -> TeamAllocationSolution:
"""Returns a dummy solution.
A dummy feasible solution consists in giving one different color per vertices.
Returns: A feasible and dummiest ColoringSolution
"""
allocation = list(range(self.number_of_activity))
allocation = [min(x, self.number_of_teams - 1) for x in allocation]
solution = TeamAllocationSolution(self, allocation=allocation)
return solution
[docs]
def count_color_constraints_violations(
self, variable: TeamAllocationSolution
) -> int:
nb_violation = 0
if len(self.graph_activity.edges) > 0:
if variable.allocation is None:
raise ValueError("variable.allocation must not be None")
for e in self.graph_activity.edges:
if (
variable.allocation[self.index_activities_name[e[0]]]
== variable.allocation[self.index_activities_name[e[1]]]
):
nb_violation += 1
return nb_violation
[docs]
def count_allowed_assignment_violations(
self, variable: TeamAllocationSolution
) -> int:
nb_violation = 0
if len(self.graph_allocation.edges) > 0:
if variable.allocation is None:
raise ValueError("variable.allocation must not be None")
for e in self.graph_allocation.edges:
if e[0] in self.index_activities_name:
if (
variable.allocation[self.index_activities_name[e[0]]]
== self.index_teams_name[e[1]]
):
nb_violation += 1
if e[1] in self.index_activities_name:
if (
variable.allocation[self.index_activities_name[e[1]]]
== self.index_teams_name[e[0]]
):
nb_violation += 1
return nb_violation
[docs]
def evaluate_from_encoding(
self, int_vector: list[int], encoding_name: str
) -> dict[str, float]:
"""Can be used in GA algorithm to build an object solution and evaluate from a int_vector representation.
Args:
int_vector: representing the colors vector of our problem
encoding_name: name of the attribute in TeamAllocationSolution corresponding to the int_vector given.
In our case, will only work for encoding_name="allocation"
Returns: the evaluation of the (int_vector, encoding) object on the team allocation problem.
"""
coloring_sol: TeamAllocationSolution
if encoding_name == "allocation":
coloring_sol = TeamAllocationSolution(problem=self, allocation=int_vector)
else:
raise ValueError("encoding_name can only be 'allocation'")
objectives = self.evaluate(coloring_sol)
return objectives
[docs]
def get_natural_explanation_unsat_constraints(
self, variable: TeamAllocationSolution
) -> list[str]:
"""
Return a list of strings describing which constraints are not fulfilled by the given solution.
Args:
variable (TeamAllocationSolution): solution object we want to "analyze"
Returns: list[str]
"""
return self.get_natural_explanation_unsat_colors(
variable
) + self.get_natural_explanation_unsat_allowed_assignment(variable)
[docs]
def get_natural_explanation_unsat_colors(
self, variable: TeamAllocationSolution
) -> list[str]:
"""
Return a list of strings describing which coloring constraints are not fulfilled by the given solution.
Args:
variable (TeamAllocationSolution): solution object we want to "analyze"
Returns: list[str]
"""
list_str_description = []
if len(self.graph_activity.edges) > 0:
if variable.allocation is None:
raise ValueError("variable.allocation must not be None")
for e in self.graph_activity.edges:
if (
variable.allocation[self.index_activities_name[e[0]]]
== variable.allocation[self.index_activities_name[e[1]]]
):
if variable.allocation[self.index_activities_name[e[0]]] is None:
continue
team = self.index_to_teams_name[
variable.allocation[self.index_activities_name[e[0]]]
]
description = (
f"Same team ({team}, index={variable.allocation[self.index_activities_name[e[0]]]}) "
f"allocated to the incompatible activities ({e[0], e[1]})"
)
list_str_description.append(description)
return list_str_description
return []
[docs]
def get_natural_explanation_unsat_allowed_assignment(
self, variable: TeamAllocationSolution
) -> list[str]:
list_str_description = []
if len(self.graph_allocation.edges) > 0:
if variable.allocation is None:
raise ValueError("variable.allocation must not be None")
for e in self.graph_allocation.edges:
if e[0] in self.index_activities_name:
if (
variable.allocation[self.index_activities_name[e[0]]]
== self.index_teams_name[e[1]]
):
description = (
f"You allocated team {e[1]} (or index {self.index_teams_name[e[1]]}) "
f"to the forbidden activity {(e[0], self.index_activities_name[e[0]])}"
)
list_str_description.append(description)
if e[1] in self.index_activities_name:
if (
variable.allocation[self.index_activities_name[e[1]]]
== self.index_teams_name[e[0]]
):
description = (
f"You allocated team {e[0]} (or index {self.index_teams_name[e[0]]}) "
f"to the forbidden activity {(e[1], self.index_activities_name[e[1]])}"
)
list_str_description.append(description)
return list_str_description
return list_str_description
[docs]
def add_additional_constraint(
self, allocation_additional_constraint: AllocationAdditionalConstraint
):
self.allocation_additional_constraint = allocation_additional_constraint
[docs]
def remove_additional_constraint(self):
self.allocation_additional_constraint = None
[docs]
class AggregateOperator(Enum):
MAX_MINUS_MIN = 0
MAX = 1
MIN = 2
MEAN = 3
GINI = 4
[docs]
class TeamAllocationProblemMultiobj(TeamAllocationProblem):
def __init__(
self,
graph_activity: Graph = None,
graph_allocation: GraphBipartite = None,
allocation_additional_constraint: Optional[
AllocationAdditionalConstraint
] = None,
schedule_activity: Optional[dict[Hashable, tuple[int, int]]] = None,
calendar_team: dict[Hashable, list[tuple[int, int]]] = None,
activities_name: list[Hashable] = None,
attributes_cumul_activities: Optional[list[str]] = None,
objective_doc_cumul_activities: Optional[
dict[str, tuple[ObjectiveDoc, AggregateOperator]]
] = None,
):
super().__init__(
graph_activity=graph_activity,
graph_allocation=graph_allocation,
allocation_additional_constraint=allocation_additional_constraint,
schedule_activity=schedule_activity,
calendar_team=calendar_team,
activities_name=activities_name,
)
self.attributes_cumul_activities = attributes_cumul_activities
self.attributes_of_activities: dict[str, dict[Hashable, float]] = {}
for attr in self.attributes_cumul_activities:
self.attributes_of_activities[attr] = {}
for t in self.activities_name:
self.attributes_of_activities[attr][
t
] = self.graph_activity.get_attr_node(t, attr)
self.attributes_cumul_activities = attributes_cumul_activities
self.objective_doc_cumul_activities = objective_doc_cumul_activities
if (
self.attributes_cumul_activities is not None
and self.objective_doc_cumul_activities is None
):
self.objective_doc_cumul_activities = {
attr: (
ObjectiveDoc(type=TypeObjective.PENALTY, default_weight=-1),
AggregateOperator.MAX_MINUS_MIN,
)
for attr in self.attributes_cumul_activities
}
[docs]
def update_attributes_of_activities(self):
for attr in self.attributes_cumul_activities:
self.attributes_of_activities[attr] = {}
for t in self.activities_name:
self.attributes_of_activities[attr][
t
] = self.graph_activity.get_attr_node(t, attr)
[docs]
def get_objective_register(self) -> ObjectiveRegister:
"""Specifies the default objective settings to be used with the evaluate function output."""
o_register = super().get_objective_register()
dict_objective = o_register.dict_objective_to_doc
# dict_objective["nb_teams"].default_weight = -10000
dict_objective["nb_teams"] = ObjectiveDoc(
type=TypeObjective.OBJECTIVE, default_weight=-10000
)
if self.objective_doc_cumul_activities is not None:
dict_objective.update(
{
x: self.objective_doc_cumul_activities[x][0]
for x in self.objective_doc_cumul_activities
}
)
return ObjectiveRegister(
objective_sense=ModeOptim.MAXIMIZATION,
objective_handling=ObjectiveHandling.AGGREGATE,
dict_objective_to_doc=dict_objective,
)
[docs]
def evaluate_cumul_nodes(
self, variable: TeamAllocationSolution, attribute_on_node: str
):
cumuls = np.zeros(shape=len(self.teams_name), dtype=int)
for i in range(self.number_of_activity):
act = self.index_to_activities_name[i]
cumuls[variable.allocation[i]] += int(
self.attributes_of_activities[attribute_on_node][act]
)
return cumuls
[docs]
def aggregate_cumuls(
self, cumuls_array, aggregate_operator: AggregateOperator
) -> float:
non_zeros = np.nonzero(cumuls_array)
fit_ = 0
if aggregate_operator == AggregateOperator.MEAN:
fit_ = np.mean(cumuls_array[non_zeros])
if aggregate_operator == AggregateOperator.MIN:
fit_ = np.min(cumuls_array[non_zeros])
if aggregate_operator == AggregateOperator.MAX:
fit_ = np.max(cumuls_array[non_zeros])
if aggregate_operator == AggregateOperator.MAX_MINUS_MIN:
fit_ = np.max(cumuls_array[non_zeros]) - np.min(cumuls_array[non_zeros])
return fit_
[docs]
def evaluate(self, variable: TeamAllocationSolution) -> dict[str, float]:
fits = TeamAllocationProblem.evaluate(self, variable)
cumuls = {
attr: self.evaluate_cumul_nodes(variable, attr)
for attr in self.attributes_cumul_activities
}
for attr in cumuls:
fits[attr] = self.aggregate_cumuls(
cumuls[attr], self.objective_doc_cumul_activities[attr][1]
)
return fits
[docs]
def satisfy_additional_constraint(
problem: TeamAllocationProblem,
solution: TeamAllocationSolution,
additional_constraint: AllocationAdditionalConstraint,
partial_solution: bool = False,
):
if additional_constraint.same_allocation is not None:
b = satisfy_same_allocation(
problem=problem,
solution=solution,
same_allocation=additional_constraint.same_allocation,
partial_solution=partial_solution,
)
if not b:
logger.info("Same allocation constraint violated")
return False
if additional_constraint.all_diff_allocation is not None:
b = satisfy_all_diff(
problem=problem,
solution=solution,
all_diffs=additional_constraint.all_diff_allocation,
partial_solution=partial_solution,
)
if not b:
logger.info("All diff allocation constraint violated")
return False
if additional_constraint.forced_allocation is not None:
b = satisfy_forced_allocation(
problem=problem,
solution=solution,
forced_allocation=additional_constraint.forced_allocation,
partial_solution=partial_solution,
)
if not b:
logger.info("Forced allocation constraint violated")
return False
if additional_constraint.forbidden_allocation is not None:
b = satisfy_forbidden_allocation(
problem=problem,
solution=solution,
forbidden_allocation=additional_constraint.forbidden_allocation,
partial_solution=partial_solution,
)
if not b:
logger.info("Forbidden allocation constraint violated")
return False
if additional_constraint.allowed_allocation is not None:
b = satisfy_allowed_allocation(
problem=problem,
solution=solution,
allowed_allocation=additional_constraint.allowed_allocation,
partial_solution=partial_solution,
)
if not b:
logger.info("Allowed allocation constraint violated")
return False
if additional_constraint.disjunction is not None:
b = satisfy_disjunctions(
problem=problem,
solution=solution,
disjunction=additional_constraint.disjunction,
partial_solution=partial_solution,
)
if not b:
logger.info("Disjunction allocation constraint violated")
return False
if additional_constraint.nb_max_teams is not None:
b = satisfy_nb_teams(
problem=problem,
solution=solution,
nb_teams_max=additional_constraint.nb_max_teams,
partial_solution=partial_solution,
)
if not b:
logger.info("Nb max teams constraint violated")
return False
return True
[docs]
def satisfy_same_allocation(
problem: TeamAllocationProblem,
solution: TeamAllocationSolution,
same_allocation: list[set[Hashable]],
partial_solution: bool = False,
):
for set_same_alloc in same_allocation:
if not partial_solution:
one_ac = next(iter(set_same_alloc))
val = solution.allocation[problem.index_activities_name[one_ac]]
# logger.debug([solution.allocation[problem.index_activities_name[x]] for x in set_same_alloc])
if any(
solution.allocation[problem.index_activities_name[x]] != val
for x in set_same_alloc
):
return False
else:
one_ac = next(
(
s
for s in set_same_alloc
if solution.allocation[problem.index_activities_name[s]] is not None
),
None,
)
if one_ac is None:
continue
else:
val = solution.allocation[problem.index_activities_name[one_ac]]
if any(
solution.allocation[problem.index_activities_name[x]] != val
for x in set_same_alloc
if solution.allocation[problem.index_activities_name[x]] is not None
):
return False
return True
[docs]
def satisfy_all_diff(
problem: TeamAllocationProblem,
solution: TeamAllocationSolution,
all_diffs: list[set[Hashable]],
partial_solution: bool = False,
):
for all_diff in all_diffs:
set_value = set()
for t in all_diff:
team = solution.allocation[problem.index_activities_name[t]]
if team is None and partial_solution:
continue
if team in set_value:
return False
else:
set_value.add(team)
return True
[docs]
def satisfy_forced_allocation(
problem: TeamAllocationProblem,
solution: TeamAllocationSolution,
forced_allocation: dict[Hashable, Hashable],
partial_solution: bool = False,
):
for task in forced_allocation:
team_index = solution.allocation[problem.index_activities_name[task]]
if team_index is None and partial_solution:
continue
if team_index is None:
return False
team_name = problem.index_to_teams_name[team_index]
if team_name != forced_allocation[task]:
return False
return True
[docs]
def satisfy_forbidden_allocation(
problem: TeamAllocationProblem,
solution: TeamAllocationSolution,
forbidden_allocation: dict[Hashable, set[Hashable]],
partial_solution: bool = False,
):
for task in forbidden_allocation:
team_index = solution.allocation[problem.index_activities_name[task]]
if team_index is None and partial_solution:
continue
if team_index is None:
return False
team_name = problem.index_to_teams_name[team_index]
if team_name in forbidden_allocation[task]:
return False
return True
[docs]
def satisfy_allowed_allocation(
problem: TeamAllocationProblem,
solution: TeamAllocationSolution,
allowed_allocation: dict[Hashable, set[Hashable]],
partial_solution: bool = False,
):
for task in allowed_allocation:
team_index = solution.allocation[problem.index_activities_name[task]]
if team_index is None and partial_solution:
continue
if team_index is None:
return False
team_name = problem.index_to_teams_name[team_index]
if team_name not in allowed_allocation[task]:
return False
return True
[docs]
def satisfy_disjunctions(
problem: TeamAllocationProblem,
solution: TeamAllocationSolution,
disjunction: list[list[tuple[Hashable, Hashable]]],
partial_solution: bool = False,
):
b = True
for one_disjunction in disjunction:
b = b and satisfy_disjunction(
problem=problem,
solution=solution,
one_disjunction=one_disjunction,
partial_solution=partial_solution,
)
if not b:
return False
return True
[docs]
def satisfy_disjunction(
problem: TeamAllocationProblem,
solution: TeamAllocationSolution,
one_disjunction: list[tuple[Hashable, Hashable]],
partial_solution: bool = False,
):
b = False
for task, team in one_disjunction:
team_index = solution.allocation[problem.index_activities_name[task]]
if team_index is None:
continue
team_name = problem.index_to_teams_name[team_index]
b |= team_name == team
if b:
return True
return False
[docs]
def satisfy_nb_teams(
problem: TeamAllocationProblem,
solution: TeamAllocationSolution,
nb_teams_max: int,
partial_solution: bool = False,
):
set_teams = set()
for i in range(problem.number_of_activity):
v = solution.allocation[i]
if v is not None:
set_teams.add(v)
if len(set_teams) > nb_teams_max:
return False
return True