Source code for discrete_optimization.workforce.generators.resource_scenario

import random
from copy import deepcopy
from typing import Hashable, Optional, Union

import numpy as np

from discrete_optimization.workforce.allocation.problem import (
    GraphBipartite,
    TeamAllocationProblem,
    TeamAllocationSolution,
)
from discrete_optimization.workforce.allocation.utils import cut_number_of_team
from discrete_optimization.workforce.generators.random_tools import RandomState
from discrete_optimization.workforce.scheduling.problem import (
    AllocSchedulingProblem,
    AllocSchedulingSolution,
)
from discrete_optimization.workforce.scheduling.solvers.cpsat import (
    AdditionalCPConstraints,
)


[docs] class ParamsRandomness: def __init__( self, seed: int = 42, lower_nb_disruption: int = 1, upper_nb_disruption: int = 10, lower_nb_teams: int = 1, upper_nb_teams: int = 4, lower_time: int = 0, upper_time: int = 480, duration_discrete_distribution: tuple[list, list] = None, ): self.duration_discrete_distribution = duration_discrete_distribution if duration_discrete_distribution is None: self.duration_discrete_distribution = ( [15, 30, 60, 120], [0.25, 0.25, 0.25, 0.25], ) self.lower_nb_disruption = lower_nb_disruption self.upper_nb_disruption = upper_nb_disruption self.lower_nb_teams = lower_nb_teams self.upper_nb_teams = upper_nb_teams self.lower_time = lower_time self.upper_time = upper_time self.seed = seed self.random_state = RandomState(self.seed)
[docs] def generate_list( self, solution: Union[TeamAllocationSolution, AllocSchedulingSolution] ): self.nb_disruption = self.random_state.get_discrete_truncated_uniform_sample( lower=self.lower_nb_disruption, upper=self.upper_nb_disruption ) self.nb_teams_per_disruption = [ self.random_state.get_discrete_truncated_uniform_sample( lower=self.lower_nb_teams, upper=self.upper_nb_teams ) for _ in range(self.nb_disruption) ] self.starts = [ self.random_state.get_discrete_truncated_uniform_sample( lower=self.lower_time, upper=self.upper_time ) for _ in range(self.nb_disruption) ] self.lengths = [ self.random_state.get_random_element_prop( list=self.duration_discrete_distribution[0], probs=self.duration_discrete_distribution[1], ) for _ in range(self.nb_disruption) ] self.nb_disruption = self.random_state.get_discrete_truncated_uniform_sample( lower=self.lower_nb_disruption, upper=self.upper_nb_disruption ) self.nb_teams_per_disruption = [ self.random_state.get_discrete_truncated_uniform_sample( lower=self.lower_nb_teams, upper=self.upper_nb_teams ) for _ in range(self.nb_disruption) ] self.starts = [ self.random_state.get_discrete_truncated_uniform_sample( lower=self.lower_time, upper=self.upper_time ) for _ in range(self.nb_disruption) ] self.lengths = [ self.random_state.get_random_element_prop( list=self.duration_discrete_distribution[0], probs=self.duration_discrete_distribution[1], ) for _ in range(self.nb_disruption) ] problem: Union[TeamAllocationProblem, AllocSchedulingProblem] = solution.problem index_to_team = None if isinstance(problem, TeamAllocationProblem): index_to_team = problem.index_to_teams_name if isinstance(problem, AllocSchedulingProblem): index_to_team = problem.index_to_team used_teams = list( {i for i in set(solution.allocation) if int(i) in index_to_team} ) disruptions = [] for j in range(self.nb_disruption): sub_teams = random.sample( used_teams, min(self.nb_teams_per_disruption[j], len(used_teams)) ) for k in range(len(sub_teams)): disruptions.append( (self.starts[j], self.starts[j] + self.lengths[j], sub_teams[k]) ) return disruptions
[docs] def compute_starts_end_activities(problem: AllocSchedulingProblem): orig_starts_arr = np.array([problem.original_start[n] for n in problem.tasks_list]) orig_ends_arr = np.array([problem.original_end[n] for n in problem.tasks_list]) return orig_starts_arr, orig_ends_arr
[docs] def compute_available_teams_per_activities_alloc_problem( problem: TeamAllocationProblem, starts: np.ndarray, ends: np.ndarray, calendars_team: dict[Hashable, np.ndarray], ): available_team_per_activity = {} for i in range(len(starts)): available_team_per_activity[problem.activities_name[i]] = set() st, end = starts[i], ends[i] for team in calendars_team: if st == end: if calendars_team[team][int(st)] == 1: available_team_per_activity[problem.activities_name[i]].add(team) if np.min(calendars_team[team][int(st) : int(end)]) == 1: available_team_per_activity[problem.activities_name[i]].add(team) return available_team_per_activity
[docs] def update_allocation_constraint( starts: np.ndarray, ends: np.ndarray, problem: TeamAllocationProblem, calendars_team: dict[Hashable, np.ndarray], ): available_team_per_activity = compute_available_teams_per_activities_alloc_problem( problem=problem, starts=starts, ends=ends, calendars_team=calendars_team ) graph = problem.graph_allocation edges = [] teams = problem.teams_name for ac in available_team_per_activity: for team in teams: if team not in available_team_per_activity[ac]: edges.append((ac, team, {})) problem.graph_allocation = GraphBipartite( nodes=graph.nodes, edges=edges, nodes_activity=graph.nodes_activity, nodes_team=graph.nodes_team, undirected=graph.undirected, compute_predecessors=False, )
[docs] def create_scheduling_problem_several_resource_dropping( allocation_problem: TeamAllocationProblem, scheduling_problem: AllocSchedulingProblem, list_drop_resource: list[tuple[int, int, int]], base_solution: TeamAllocationSolution, ): evaluation_allocation = allocation_problem.evaluate(base_solution) nb_team_used = evaluation_allocation["nb_teams"] used_team = set(base_solution.allocation) sched = np.zeros((scheduling_problem.number_tasks, 2)) allocation = np.zeros(scheduling_problem.number_tasks) for i in range(scheduling_problem.number_tasks): sched[i, 0] = scheduling_problem.original_start[ scheduling_problem.index_to_task[i] ] sched[i, 1] = scheduling_problem.original_end[ scheduling_problem.index_to_task[i] ] team_index = base_solution.allocation[ allocation_problem.index_activities_name[ scheduling_problem.index_to_task[i] ] ] team = allocation_problem.teams_name[team_index] allocation[i] = scheduling_problem.teams_to_index[team] sol_sched = AllocSchedulingSolution( problem=scheduling_problem, schedule=sched, allocation=allocation ) for from_time, to_time, index_team_name in list_drop_resource: nc = [] team_name = scheduling_problem.team_names[index_team_name] for st, end in scheduling_problem.calendar_team[team_name]: if st <= from_time and end >= to_time: nc.append((st, from_time)) nc.append((to_time, end)) elif st <= from_time and end <= from_time: nc.append((st, end)) elif from_time <= st <= to_time: # nc.append((st, min(end, to_time))) if end > to_time: nc.append((to_time, end)) else: nc.append((st, end)) scheduling_problem.calendar_team[team_name] = nc used_teams_dict = {x: None for x in scheduling_problem.index_to_team} for team in scheduling_problem.index_to_team: if team not in used_team: used_teams_dict[team] = False # for x in scheduling_problem.index_to_team: # if x not in used_teams: # used_teams[x] = False additional_cp_constraints = AdditionalCPConstraints( nb_teams_bounds=(None, nb_team_used), # (nb_team_used - 1, nb_team_used), team_used_constraint=used_teams_dict, set_tasks_ignore_reallocation=set(), forced_allocation={}, ) return { "additional_constraints": additional_cp_constraints, "base_solution": sol_sched, }
[docs] def generate_resource_disruption_scenario_from( original_allocation_problem: TeamAllocationProblem, original_scheduling_problem: AllocSchedulingProblem, original_solution: TeamAllocationSolution, list_drop_resource: Optional[list[tuple[int, int, int]]] = None, params_randomness: Optional[ParamsRandomness] = None, ): """ Basically same code than the generator above, but starting from a known problem and original solution """ if list_drop_resource is None: some_resource = random.choice(original_solution.allocation) list_drop_resource = [(3 * 60, 9 * 60, some_resource)] if params_randomness is not None: list_drop_resource = params_randomness.generate_list(solution=original_solution) params = create_scheduling_problem_several_resource_dropping( allocation_problem=original_allocation_problem, list_drop_resource=list_drop_resource, scheduling_problem=original_scheduling_problem, base_solution=original_solution, ) starts, ends = compute_starts_end_activities(problem=original_scheduling_problem) calendars_array = {} for team in original_scheduling_problem.team_names: calendars_array[team] = np.zeros((original_scheduling_problem.horizon + 2)) for slot in original_scheduling_problem.calendar_team[team]: calendars_array[team][ slot[0] : min(slot[1], calendars_array[team].shape[0]) ] = 1 new_alloc_problem = deepcopy(original_allocation_problem) update_allocation_constraint( starts, ends, problem=new_alloc_problem, calendars_team=calendars_array ) available_teams_in_this_scenario = set( [ original_allocation_problem.teams_name[x] for x in original_solution.allocation ] ) new_alloc_problem = cut_number_of_team( new_alloc_problem, subset_teams_keep=available_teams_in_this_scenario ) return { "original_allocation_problem": original_scheduling_problem, "original_allocation_solution": original_solution, "new_allocation_problem": new_alloc_problem, "scheduling_problem": original_scheduling_problem, "current_scheduling_solution": params["base_solution"], "additional_constraint_scheduling": params["additional_constraints"], "list_drop_resource": list_drop_resource, }
[docs] def generate_allocation_disruption( original_allocation_problem: TeamAllocationProblem, original_solution: TeamAllocationSolution, list_drop_resource: Optional[list[tuple[int, int, int]]] = None, params_randomness: Optional[ParamsRandomness] = None, ): if list_drop_resource is None: some_resource = random.choice(original_solution.allocation) list_drop_resource = [(3 * 60, 9 * 60, some_resource)] if params_randomness is not None: list_drop_resource = params_randomness.generate_list(solution=original_solution) new_calendar = deepcopy(original_allocation_problem.calendar_team) for from_time, to_time, index_team_name in list_drop_resource: nc = [] team_name = original_allocation_problem.teams_name[index_team_name] for st, end in new_calendar[team_name]: if st <= from_time and end >= to_time: nc.append((st, from_time)) nc.append((to_time, end)) elif st <= from_time and end <= from_time: nc.append((st, end)) elif from_time <= st <= to_time: if end > to_time: nc.append((to_time, end)) else: nc.append((st, end)) new_calendar[team_name] = nc new_alloc_problem = TeamAllocationProblem( allocation_additional_constraint=original_allocation_problem.allocation_additional_constraint, schedule_activity=original_allocation_problem.schedule, calendar_team=new_calendar, activities_name=original_allocation_problem.activities_name, graph_allocation=None, graph_activity=None, ) available_teams_in_this_scenario = set( [ original_allocation_problem.teams_name[x] for x in original_solution.allocation ] ) new_alloc_problem = cut_number_of_team( new_alloc_problem, subset_teams_keep=available_teams_in_this_scenario ) new_solution = TeamAllocationSolution( problem=new_alloc_problem, allocation=[ new_alloc_problem.index_teams_name[ original_allocation_problem.teams_name[original_solution.allocation[i]] ] for i in range(new_alloc_problem.number_of_activity) ], ) return { "original_allocation_problem": original_allocation_problem, "original_allocation_solution": original_solution, "new_allocation_problem": new_alloc_problem, "new_solution": new_solution, "list_drop_resource": list_drop_resource, }
[docs] def generate_scheduling_disruption( original_scheduling_problem: AllocSchedulingProblem, original_solution: AllocSchedulingSolution, list_drop_resource: Optional[list[tuple[int, int, int]]] = None, params_randomness: Optional[ParamsRandomness] = None, ): if list_drop_resource is None: some_resource = random.choice(original_solution.allocation) list_drop_resource = [(3 * 60, 9 * 60, some_resource)] if params_randomness is not None: list_drop_resource = params_randomness.generate_list(solution=original_solution) new_calendar = deepcopy(original_scheduling_problem.calendar_team) for from_time, to_time, index_team_name in list_drop_resource: nc = [] team_name = original_scheduling_problem.team_names[index_team_name] for st, end in new_calendar[team_name]: if st <= from_time and end >= to_time: nc.append((st, from_time)) nc.append((to_time, end)) elif st <= from_time and end <= from_time: nc.append((st, end)) elif from_time <= st <= to_time: if end > to_time: nc.append((to_time, end)) else: nc.append((st, end)) new_calendar[team_name] = nc new_scheduling_problem = deepcopy(original_scheduling_problem) new_scheduling_problem.calendar_team = new_calendar nb_team_used = original_scheduling_problem.evaluate(original_solution)["nb_teams"] used_teams_dict = {} used_team_set = set(original_solution.allocation) for team in original_scheduling_problem.index_to_team: if team not in used_team_set: used_teams_dict[team] = False additional_cp_constraints = AdditionalCPConstraints( nb_teams_bounds=(None, nb_team_used), # (nb_team_used - 1, nb_team_used), team_used_constraint=used_teams_dict, set_tasks_ignore_reallocation=set(), forced_allocation={}, ) return { "scheduling_problem": new_scheduling_problem, "additional_constraint_scheduling": additional_cp_constraints, "list_drop_resource": list_drop_resource, }