Source code for discrete_optimization.rcpsp.problem

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
from collections.abc import Callable, Hashable, Iterable
from copy import deepcopy
from enum import Enum
from functools import partial
from typing import Any, Optional, Union

import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt

from discrete_optimization.generic_tools.do_problem import (
    EncodingRegister,
    ModeOptim,
    ObjectiveDoc,
    ObjectiveHandling,
    ObjectiveRegister,
    Problem,
    Solution,
    TupleFitness,
    TypeAttribute,
    TypeObjective,
)
from discrete_optimization.generic_tools.graph_api import Graph
from discrete_optimization.rcpsp.fast_function import (
    compute_mean_ressource,
    sgs_fast,
    sgs_fast_partial_schedule_incomplete_permutation_tasks,
)
from discrete_optimization.rcpsp.solution import RcpspSolution
from discrete_optimization.rcpsp.special_constraints import (
    PairModeConstraint,
    SpecialConstraintsDescription,
)
from discrete_optimization.rcpsp.utils import intersect

logger = logging.getLogger(__name__)


[docs] class ScheduleGenerationScheme(Enum): SERIAL_SGS = 0 PARALLEL_SGS = 1
[docs] class RcpspProblem(Problem): """ Attributes: resources: non_renewable_resources: mode_details: successors: horizon: horizon_multiplier: tasks_list: source_task: sink_task: name_task: n_jobs (int): n_jobs_non_dummy (int): excluding dummy activities Start (0) and End (n) special_constraints: do_special_constraints (bool): relax_the_start_at_end (bool): relax some conditions only if do_special_constraints fixed_permutation (Optional[list[int]]): fixed_modes (Optional[list[int]]): Args: resources: {resource_name: number_of_resource} non_renewable_resources: [resource_name3, resource_name4] mode_details: {job_id: {mode_id: {resource_name1: number_of_resources_needed, resource_name2: ...}} one key being "duration" successors: horizon: horizon_multiplier: tasks_list: {task_id: list of successor task ids} source_task: sink_task: name_task: special_constraints: relax_the_start_at_end: fixed_permutation: fixed_modes: **kwargs: """ sgs: ScheduleGenerationScheme def __init__( self, resources: dict[str, Union[int, list[int]]], non_renewable_resources: list[str], mode_details: dict[Hashable, dict[int, dict[str, int]]], successors: dict[Hashable, list[Hashable]], horizon: int, horizon_multiplier: int = 1, tasks_list: Optional[list[Hashable]] = None, source_task: Optional[Hashable] = None, sink_task: Optional[Hashable] = None, name_task: Optional[dict[Hashable, str]] = None, calendar_details: Optional[dict[str, list[list[int]]]] = None, special_constraints: Optional[SpecialConstraintsDescription] = None, relax_the_start_at_end: bool = True, fixed_permutation: Optional[list[int]] = None, fixed_modes: Optional[list[int]] = None, **kwargs: Any, ): self.resources = resources self.resources_list = list(self.resources.keys()) self.non_renewable_resources = non_renewable_resources self.mode_details = mode_details self.successors = successors self.horizon = horizon self.horizon_multiplier = horizon_multiplier self.calendar_details = calendar_details if name_task is None: self.name_task = {x: str(x) for x in self.mode_details} else: self.name_task = name_task if tasks_list is None: self.tasks_list = list(self.mode_details.keys()) else: self.tasks_list = tasks_list self.n_jobs = len(self.mode_details.keys()) self.n_jobs_non_dummy = self.n_jobs - 2 self.index_task = {self.tasks_list[i]: i for i in range(self.n_jobs)} if source_task is None: if all((isinstance(t, int) for t in self.tasks_list)): self.source_task = min(self.tasks_list) # type: ignore else: raise ValueError( "source_task cannot be None if tasks id given in tasks_list are not all integers." ) else: self.source_task = source_task if sink_task is None: if all((isinstance(t, int) for t in self.tasks_list)): self.sink_task = max(self.tasks_list) # type: ignore else: raise ValueError( "sink_task cannot be None if tasks id given in tasks_list are not all integers." ) else: self.sink_task = sink_task self.tasks_list_non_dummy = [ t for t in self.tasks_list if t not in {self.source_task, self.sink_task} ] self.index_task_non_dummy = { self.tasks_list_non_dummy[i]: i for i in range(self.n_jobs_non_dummy) } self.max_number_of_mode = max( [len(self.mode_details[key1].keys()) for key1 in self.mode_details.keys()] ) self.is_multimode = self.max_number_of_mode > 1 self.is_calendar = False if any(isinstance(self.resources[res], Iterable) for res in self.resources): self.is_calendar = ( max( ( len( {self.resources[res]} if isinstance(self.resources[res], int) else set(self.resources[res]) # type: ignore ) for res in self.resources ) ) > 1 ) if not self.is_calendar: self.resources = { r: self.resources[r] if isinstance(self.resources[r], int) else self.resources[r][0] # type: ignore for r in self.resources } ( self.func_sgs, self.func_sgs_2, self.compute_mean_resource, ) = create_np_data_and_jit_functions(self) self.costs: dict[str, bool] = { "makespan": True, "mean_resource_reserve": kwargs.get("mean_resource_reserve", False), } if special_constraints is None: self.do_special_constraints = False self.special_constraints = SpecialConstraintsDescription() else: self.do_special_constraints = True self.special_constraints = special_constraints predecessors_dict: dict[Hashable, list[Hashable]] = { task: [] for task in self.successors } for task in self.successors: for stask in self.successors[task]: predecessors_dict[stask] += [task] for t1, t2 in self.special_constraints.start_at_end: if t2 not in self.successors[t1]: self.successors[t1].append(t2) for t1, t2, off in self.special_constraints.start_at_end_plus_offset: if t2 not in self.successors[t1]: self.successors[t1].append(t2) for t1, t2 in self.special_constraints.start_together: for predt1 in predecessors_dict[t1]: if t2 not in self.successors[predt1]: self.successors[predt1] += [t2] for predt2 in predecessors_dict[t2]: if t1 not in self.successors[predt2]: self.successors[predt2] += [t1] self.graph = self.compute_graph( compute_predecessors=self.do_special_constraints ) if self.do_special_constraints: self.predecessors = self.graph.predecessors_dict self.relax_the_start_at_end = relax_the_start_at_end self.fixed_permutation = fixed_permutation self.fixed_modes = fixed_modes
[docs] def update_functions(self) -> None: ( self.func_sgs, self.func_sgs_2, self.compute_mean_resource, ) = create_np_data_and_jit_functions(rcpsp_problem=self)
[docs] def is_rcpsp_multimode(self) -> bool: return self.is_multimode
[docs] def is_varying_resource(self) -> bool: return self.is_calendar
[docs] def is_preemptive(self) -> bool: return False
[docs] def is_multiskill(self) -> bool: return False
[docs] def includes_special_constraint(self) -> bool: return self.do_special_constraints
[docs] def get_resource_names(self) -> list[str]: return self.resources_list
[docs] def get_tasks_list(self) -> list[Hashable]: return self.tasks_list
[docs] def get_resource_availability_array(self, res: str) -> list[int]: if self.is_varying_resource() and not isinstance(self.resources[res], int): return self.resources[res] # type: ignore else: return self.horizon * [self.resources[res]] # type: ignore
[docs] def compute_graph(self, compute_predecessors: bool = False) -> Graph: nodes: list[tuple[Hashable, dict[str, Any]]] = [ ( n, { str(mode): self.mode_details[n][mode]["duration"] for mode in self.mode_details[n] }, ) for n in self.tasks_list ] edges: list[tuple[Hashable, Hashable, dict[str, Any]]] = [] for n in self.successors: for succ in self.successors[n]: edges += [(n, succ, {})] return Graph( nodes, edges, compute_predecessors=compute_predecessors, undirected=False )
[docs] def evaluate_function(self, rcpsp_sol: RcpspSolution) -> tuple[int, float, int]: if rcpsp_sol._schedule_to_recompute: rcpsp_sol.generate_schedule_from_permutation_serial_sgs() makespan = rcpsp_sol.rcpsp_schedule[self.sink_task]["end_time"] if self.costs["mean_resource_reserve"]: obj_mean_resource_reserve = rcpsp_sol.compute_mean_resource_reserve() else: obj_mean_resource_reserve = 0.0 if self.do_special_constraints: penalty = evaluate_constraints( solution=rcpsp_sol, constraints=self.special_constraints ) else: penalty = 0 return makespan, obj_mean_resource_reserve, penalty
[docs] def evaluate_from_encoding( self, int_vector: list[int], encoding_name: str ) -> dict[str, float]: if encoding_name == "rcpsp_permutation": if self.fixed_modes is None: rcpsp_modes = [1 for i in range(self.n_jobs_non_dummy)] else: rcpsp_modes = self.fixed_modes rcpsp_sol = RcpspSolution( problem=self, rcpsp_permutation=int_vector, rcpsp_modes=rcpsp_modes ) elif encoding_name == "rcpsp_modes": if self.fixed_permutation is not None: rcpsp_sol = RcpspSolution( problem=self, rcpsp_permutation=self.fixed_permutation, rcpsp_modes=int_vector, ) else: raise RuntimeError( "Encoding rcpsp_modes possible " "only if self.fixed_permutation is not None" ) else: raise NotImplementedError(f"Encoding {encoding_name} not implemented") objectives = self.evaluate(rcpsp_sol) return objectives
[docs] def evaluate(self, rcpsp_sol: RcpspSolution) -> dict[str, float]: # type: ignore obj_makespan, obj_mean_resource_reserve, penalty = self.evaluate_function( rcpsp_sol ) return { "makespan": float(obj_makespan), "mean_resource_reserve": obj_mean_resource_reserve, "constraint_penalty": float(penalty), }
[docs] def evaluate_mobj(self, rcpsp_sol: RcpspSolution) -> TupleFitness: # type: ignore return self.evaluate_mobj_from_dict(self.evaluate(rcpsp_sol))
[docs] def evaluate_mobj_from_dict(self, dict_values: dict[str, float]) -> TupleFitness: return TupleFitness( np.array([-dict_values["makespan"], dict_values["mean_resource_reserve"]]), 2, )
[docs] def build_mode_dict( self, rcpsp_modes_from_solution: list[int] ) -> dict[Hashable, int]: modes_dict = { self.tasks_list_non_dummy[i]: rcpsp_modes_from_solution[i] for i in range(self.n_jobs_non_dummy) } modes_dict[self.source_task] = 1 modes_dict[self.sink_task] = 1 return modes_dict
[docs] def build_mode_array(self, rcpsp_modes_from_solution: list[int]) -> list[int]: modes_dict = self.build_mode_dict( rcpsp_modes_from_solution=rcpsp_modes_from_solution ) return [modes_dict[t] for t in self.tasks_list]
[docs] def return_index_task(self, task: Hashable, offset: int = 0) -> int: return self.index_task[task] + offset
[docs] def satisfy(self, rcpsp_sol: RcpspSolution) -> bool: # type: ignore if rcpsp_sol.rcpsp_schedule_feasible is False: logger.debug("Schedule flagged as infeasible when generated") return False if len(rcpsp_sol.rcpsp_schedule) != self.n_jobs: logger.debug("Missing task in schedule") if self.do_special_constraints: if not check_solution_with_special_constraints( problem=self, solution=rcpsp_sol, relax_the_start_at_end=self.relax_the_start_at_end, ): return False modes_dict = self.build_mode_dict( rcpsp_modes_from_solution=rcpsp_sol.rcpsp_modes ) start_times = [ rcpsp_sol.rcpsp_schedule[t]["start_time"] for t in rcpsp_sol.rcpsp_schedule ] for t in start_times: resource_usage = {} for res in self.resources_list: resource_usage[res] = 0 for act_id in rcpsp_sol.rcpsp_schedule: start = rcpsp_sol.rcpsp_schedule[act_id]["start_time"] end = rcpsp_sol.rcpsp_schedule[act_id]["end_time"] mode = modes_dict[act_id] for res in self.resources_list: if start <= t < end: resource_usage[res] += self.mode_details[act_id][mode].get( res, 0 ) for res in self.resources.keys(): if resource_usage[res] > self.get_resource_available(res, t): logger.debug( [ act for act in rcpsp_sol.rcpsp_schedule if rcpsp_sol.rcpsp_schedule[act]["start_time"] <= t < rcpsp_sol.rcpsp_schedule[act]["end_time"] ] ) logger.debug( f"Time step resource violation: time: {t} " f"res {res} res_usage: {resource_usage[res]}" f"res_avail: {self.resources[res]}" ) return False # Check for non-renewable resource violation for res in self.non_renewable_resources: usage = 0 for act_id in rcpsp_sol.rcpsp_schedule: mode = modes_dict[act_id] usage += self.mode_details[act_id][mode][res] if usage > self.get_max_resource_capacity(res): logger.debug( f"Non-renewable resource violation: act_id: {act_id}" f"res {res} res_usage: {usage} res_avail: {self.resources[res]}" ) return False # Check precedences / successors for act_id in list(self.successors.keys()): for succ_id in self.successors[act_id]: start_succ = rcpsp_sol.rcpsp_schedule[succ_id]["start_time"] end_pred = rcpsp_sol.rcpsp_schedule[act_id]["end_time"] if start_succ < end_pred: logger.debug( f"Precedence relationship broken: {act_id} end at {end_pred} " f"while {succ_id} start at {start_succ}" ) return False return True
def __str__(self) -> str: val = ( "I'm a RCPSP problem with " + str(self.n_jobs) + " tasks.." + " and ressources =" + str(self.resources_list) ) return val
[docs] def get_solution_type(self) -> type[Solution]: return RcpspSolution
[docs] def get_attribute_register(self) -> EncodingRegister: dict_register = { "rcpsp_permutation": { "name": "rcpsp_permutation", "type": [TypeAttribute.PERMUTATION, TypeAttribute.PERMUTATION_RCPSP], "range": range(self.n_jobs_non_dummy), "n": self.n_jobs_non_dummy, } } max_number_modes = max([len(self.mode_details[x]) for x in self.mode_details]) dict_register["rcpsp_modes"] = { "name": "rcpsp_modes", "type": [TypeAttribute.LIST_INTEGER], "n": self.n_jobs_non_dummy, "low": 1, # integer. "up": max_number_modes, # integer. "arity": max_number_modes, } mode_arity = [ len(self.mode_details[task]) for task in self.tasks_list_non_dummy ] dict_register["rcpsp_modes_arity_fix"] = { "name": "rcpsp_modes", "type": [TypeAttribute.LIST_INTEGER_SPECIFIC_ARITY], "n": self.n_jobs_non_dummy, "low": 1, "up": mode_arity, "arities": mode_arity, } return EncodingRegister(dict_register)
[docs] def get_objective_register(self) -> ObjectiveRegister: objective_handling = ObjectiveHandling.SINGLE dict_objective = { "makespan": ObjectiveDoc(type=TypeObjective.OBJECTIVE, default_weight=-1.0) } # "mean_resource_reserve": {"type": TypeObjective.OBJECTIVE, "default_weight": 1}} if self.do_special_constraints: objective_handling = ObjectiveHandling.AGGREGATE dict_objective["constraint_penalty"] = ObjectiveDoc( type=TypeObjective.PENALTY, default_weight=-100.0 ) return ObjectiveRegister( objective_sense=ModeOptim.MAXIMIZATION, objective_handling=objective_handling, dict_objective_to_doc=dict_objective, )
[docs] def compute_resource_consumption( self, rcpsp_sol: RcpspSolution ) -> npt.NDArray[np.int_]: modes_dict = self.build_mode_dict(rcpsp_sol.rcpsp_modes) makespan = rcpsp_sol.rcpsp_schedule[self.sink_task]["end_time"] consumptions = np.zeros((len(self.resources), makespan + 1), dtype=np.int_) for act_id in rcpsp_sol.rcpsp_schedule: for ir in range(len(self.resources)): consumptions[ ir, rcpsp_sol.rcpsp_schedule[act_id]["start_time"] + 1 : rcpsp_sol.rcpsp_schedule[act_id]["end_time"] + 1, ] += self.mode_details[act_id][modes_dict[act_id]][ self.resources_list[ir] ] return consumptions
[docs] def plot_ressource_view(self, rcpsp_sol: RcpspSolution) -> None: consumption = self.compute_resource_consumption(rcpsp_sol=rcpsp_sol) fig, ax = plt.subplots(nrows=len(self.resources_list), sharex=True) for i in range(len(self.resources_list)): ax[i].axhline( y=self.get_max_resource_capacity(self.resources_list[i]), label=self.resources_list[i], ) ax[i].plot(consumption[i, :]) ax[i].legend()
[docs] def copy(self) -> "RcpspProblem": model = RcpspProblem( resources=self.resources, tasks_list=self.tasks_list, source_task=self.source_task, sink_task=self.sink_task, non_renewable_resources=self.non_renewable_resources, mode_details=deepcopy(self.mode_details), successors=deepcopy(self.successors), horizon=self.horizon, horizon_multiplier=self.horizon_multiplier, name_task=self.name_task, mean_resource_reserve=self.costs.get("mean_resource_reserve", False), fixed_modes=self.fixed_modes, fixed_permutation=self.fixed_permutation, ) return model
[docs] def get_dummy_solution(self) -> RcpspSolution: sol = RcpspSolution( problem=self, rcpsp_permutation=list(range(self.n_jobs_non_dummy)), rcpsp_modes=[1 for i in range(self.n_jobs_non_dummy)], ) return sol
[docs] def get_resource_available(self, res: str, time: int) -> int: if self.is_calendar: return self.resources.get(res, [0])[time] # type: ignore return self.resources.get(res, 0) # type: ignore
[docs] def get_max_resource_capacity(self, res: str) -> int: if self.is_calendar: return max(self.resources.get(res, [0])) # type: ignore return self.resources.get(res, 0) # type: ignore
[docs] def set_fixed_attributes(self, encoding_str: str, sol: RcpspSolution) -> None: att = self.get_attribute_register().dict_attribute_to_type[encoding_str]["name"] if att == "rcpsp_modes": self.set_fixed_modes(sol.rcpsp_modes) elif att == "rcpsp_permutation": self.set_fixed_permutation(sol.rcpsp_permutation)
[docs] def set_fixed_modes(self, fixed_modes: list[int]) -> None: self.fixed_modes = fixed_modes
[docs] def set_fixed_permutation(self, fixed_permutation: list[int]) -> None: self.fixed_permutation = fixed_permutation
[docs] def create_np_data_and_jit_functions( rcpsp_problem: RcpspProblem, ) -> tuple[ Callable[ ..., tuple[dict[int, tuple[int, int]], bool], ], Callable[ ..., tuple[dict[int, tuple[int, int]], bool], ], Callable[ ..., float, ], ]: consumption_array = np.zeros( ( rcpsp_problem.n_jobs, rcpsp_problem.max_number_of_mode, len(rcpsp_problem.resources_list), ), dtype=np.int_, ) duration_array = np.zeros( (rcpsp_problem.n_jobs, rcpsp_problem.max_number_of_mode), dtype=np.int_ ) predecessors = np.zeros((rcpsp_problem.n_jobs, rcpsp_problem.n_jobs), dtype=np.int_) successors = np.zeros((rcpsp_problem.n_jobs, rcpsp_problem.n_jobs), dtype=np.int_) horizon = rcpsp_problem.horizon ressource_available = np.zeros( (len(rcpsp_problem.resources_list), horizon), dtype=np.int_ ) ressource_renewable = np.ones((len(rcpsp_problem.resources_list)), dtype=bool) minimum_starting_time_array = np.zeros(rcpsp_problem.n_jobs, dtype=np.int_) for i in range(len(rcpsp_problem.tasks_list)): task = rcpsp_problem.tasks_list[i] index_mode = 0 for mode in sorted( rcpsp_problem.mode_details[rcpsp_problem.tasks_list[i]].keys() ): for k in range(len(rcpsp_problem.resources_list)): consumption_array[i, index_mode, k] = rcpsp_problem.mode_details[task][ mode ].get(rcpsp_problem.resources_list[k], 0) duration_array[i, index_mode] = rcpsp_problem.mode_details[task][mode][ "duration" ] index_mode += 1 task_index = {rcpsp_problem.tasks_list[i]: i for i in range(rcpsp_problem.n_jobs)} for k in range(len(rcpsp_problem.resources_list)): if rcpsp_problem.is_varying_resource(): ressource_available[k, :] = rcpsp_problem.resources[ # type: ignore rcpsp_problem.resources_list[k] ][: ressource_available.shape[1]] else: ressource_available[k, :] = np.full( ressource_available.shape[1], rcpsp_problem.resources[rcpsp_problem.resources_list[k]], dtype=np.int_, ) if rcpsp_problem.resources_list[k] in rcpsp_problem.non_renewable_resources: ressource_renewable[k] = False for i in range(len(rcpsp_problem.tasks_list)): task = rcpsp_problem.tasks_list[i] for s in rcpsp_problem.successors[task]: index_s = task_index[s] predecessors[index_s, i] = 1 successors[i, index_s] = 1 if "special_constraints" in rcpsp_problem.__dict__.keys(): for t in rcpsp_problem.special_constraints.start_times_window: if rcpsp_problem.special_constraints.start_times_window[t][0] is not None: minimum_starting_time_array[ rcpsp_problem.index_task[t] ] = rcpsp_problem.special_constraints.start_times_window[t][0] func_sgs = partial( sgs_fast, consumption_array=consumption_array, duration_array=duration_array, predecessors=predecessors, successors=successors, horizon=horizon, ressource_available=ressource_available, ressource_renewable=ressource_renewable, minimum_starting_time_array=minimum_starting_time_array, ) func_sgs_2 = partial( sgs_fast_partial_schedule_incomplete_permutation_tasks, consumption_array=consumption_array, duration_array=duration_array, predecessors=predecessors, successors=successors, horizon=horizon, ressource_available=ressource_available, ressource_renewable=ressource_renewable, minimum_starting_time_array=minimum_starting_time_array, ) func_compute_mean_resource = partial( compute_mean_ressource, consumption_array=consumption_array, ressource_available=ressource_available, ressource_renewable=ressource_renewable, ) return func_sgs, func_sgs_2, func_compute_mean_resource
[docs] def evaluate_constraints( solution: RcpspSolution, constraints: SpecialConstraintsDescription, ) -> int: list_constraints_not_respected = compute_constraints_details(solution, constraints) return sum([x[-1] for x in list_constraints_not_respected])
[docs] def compute_constraints_details( solution: RcpspSolution, constraints: SpecialConstraintsDescription, ) -> list[tuple[str, Hashable, Hashable, Optional[int], Optional[int], int]]: if not solution.rcpsp_schedule_feasible: return [] start_together = constraints.start_together start_at_end = constraints.start_at_end start_at_end_plus_offset = constraints.start_at_end_plus_offset start_after_nunit = constraints.start_after_nunit disjunctive = constraints.disjunctive_tasks list_constraints_not_respected: list[ tuple[str, Hashable, Hashable, Optional[int], Optional[int], int] ] = [] for (t1, t2) in start_together: time1 = solution.get_start_time(t1) time2 = solution.get_start_time(t2) b = time1 == time2 if not b: list_constraints_not_respected += [ ("start_together", t1, t2, time1, time2, abs(time2 - time1)) ] for (t1, t2) in start_at_end: time1 = solution.get_end_time(t1) time2 = solution.get_start_time(t2) b = time1 == time2 if not b: list_constraints_not_respected += [ ("start_at_end", t1, t2, time1, time2, abs(time2 - time1)) ] for (t1, t2, off) in start_at_end_plus_offset: time1 = solution.get_end_time(t1) + off time2 = solution.get_start_time(t2) b = time2 >= time1 if not b: list_constraints_not_respected += [ ("start_at_end_plus_offset", t1, t2, time1, time2, abs(time2 - time1)) ] for (t1, t2, off) in start_after_nunit: time1 = solution.get_start_time(t1) + off time2 = solution.get_start_time(t2) b = time2 >= time1 if not b: list_constraints_not_respected += [ ("start_after_nunit", t1, t2, time1, time2, abs(time2 - time1)) ] for t1, t2 in disjunctive: segt = intersect( [solution.get_start_time(t1), solution.get_end_time(t1)], [solution.get_start_time(t2), solution.get_end_time(t2)], ) if segt is not None: list_constraints_not_respected += [ ("disjunctive", t1, t2, None, None, segt[1] - segt[0]) ] for t in constraints.start_times_window: if constraints.start_times_window[t][0] is not None: if solution.get_start_time(t) < constraints.start_times_window[t][0]: # type: ignore list_constraints_not_respected += [ ( "start_window_0", t, t, None, None, constraints.start_times_window[t][0] # type: ignore - solution.get_start_time(t), ) ] if constraints.start_times_window[t][1] is not None: if solution.get_start_time(t) > constraints.start_times_window[t][1]: # type: ignore list_constraints_not_respected += [ ( "start_window_1", t, t, None, None, -constraints.start_times_window[t][1] # type: ignore + solution.get_start_time(t), ) ] for t in constraints.end_times_window: if constraints.end_times_window[t][0] is not None: if solution.get_end_time(t) < constraints.end_times_window[t][0]: # type: ignore list_constraints_not_respected += [ ( "end_window_0", t, t, None, None, constraints.end_times_window[t][0] - solution.get_end_time(t), # type: ignore ) ] if constraints.end_times_window[t][1] is not None: if solution.get_end_time(t) > constraints.end_times_window[t][1]: # type: ignore list_constraints_not_respected += [ ( "end_window_1", t, t, None, None, -constraints.end_times_window[t][1] + solution.get_end_time(t), # type: ignore ) ] if constraints.pair_mode_constraint is not None: list_constraints_not_respected += compute_details_mode_constraint( solution=solution, pair_mode_constraint=constraints.pair_mode_constraint ) return list_constraints_not_respected
[docs] def check_solution_with_special_constraints( problem: RcpspProblem, solution: RcpspSolution, relax_the_start_at_end: bool = True, ) -> bool: if not solution.rcpsp_schedule_feasible: return False start_together = problem.special_constraints.start_together start_at_end = problem.special_constraints.start_at_end start_at_end_plus_offset = problem.special_constraints.start_at_end_plus_offset start_after_nunit = problem.special_constraints.start_after_nunit disjunctive = problem.special_constraints.disjunctive_tasks for (t1, t2) in start_together: if not relax_the_start_at_end: b = solution.get_start_time(t1) == solution.get_start_time(t2) if not b: return False for (t1, t2) in start_at_end: if relax_the_start_at_end: b = solution.get_start_time(t2) >= solution.get_end_time(t1) else: b = solution.get_start_time(t2) == solution.get_end_time(t1) if not b: return False for (t1, t2, off) in start_at_end_plus_offset: b = solution.get_start_time(t2) >= solution.get_end_time(t1) + off if not b: logger.debug(("start_at_end_plus_offset NOT respected: ", t1, t2, off)) logger.debug( ( solution.get_start_time(t2), " >= ", solution.get_end_time(t1), "+", off, ) ) return False for (t1, t2, off) in start_after_nunit: b = solution.get_start_time(t2) >= solution.get_start_time(t1) + off if not b: logger.debug(("start_after_nunit NOT respected: ", t1, t2, off)) return False for t1, t2 in disjunctive: if ( intersect( [solution.get_start_time(t1), solution.get_end_time(t1)], [solution.get_start_time(t2), solution.get_end_time(t2)], ) is not None ): return False for t in problem.special_constraints.start_times_window: if problem.special_constraints.start_times_window[t][0] is not None: if ( solution.get_start_time(t) # type: ignore < problem.special_constraints.start_times_window[t][0] ): logger.debug( ( "start time 0, ", t, solution.get_start_time(t), problem.special_constraints.start_times_window[t][0], ) ) return False if problem.special_constraints.start_times_window[t][1] is not None: if ( solution.get_start_time(t) # type: ignore > problem.special_constraints.start_times_window[t][1] ): logger.debug( ( "start time 1, ", t, solution.get_start_time(t), problem.special_constraints.start_times_window[t][1], ) ) return False for t in problem.special_constraints.end_times_window: if problem.special_constraints.end_times_window[t][0] is not None: if ( solution.get_end_time(t) # type: ignore < problem.special_constraints.end_times_window[t][0] ): logger.debug( ( "end time 0, ", t, solution.get_end_time(t), problem.special_constraints.end_times_window[t][0], ) ) return False if problem.special_constraints.end_times_window[t][1] is not None: if ( solution.get_end_time(t) # type: ignore > problem.special_constraints.end_times_window[t][1] ): logger.debug( ( "end time 1, ", t, solution.get_end_time(t), problem.special_constraints.end_times_window[t][1], ) ) return False if problem.special_constraints.pair_mode_constraint is not None: b = check_pair_mode_constraint( solution=solution, pair_mode_constraint=problem.special_constraints.pair_mode_constraint, ) if not b: return False return True
[docs] def check_pair_mode_constraint( solution: RcpspSolution, pair_mode_constraint: PairModeConstraint ): if pair_mode_constraint.allowed_mode_assignment is not None: for ac1, ac2 in pair_mode_constraint.allowed_mode_assignment: mode_ac1 = solution.get_mode(ac1) mode_ac2 = solution.get_mode(ac2) if (mode_ac1, mode_ac2) not in pair_mode_constraint.allowed_mode_assignment[ ac1, ac2 ]: return False return True if pair_mode_constraint.same_score_mode is not None: for ac1, ac2 in pair_mode_constraint.same_score_mode: score_ac1 = pair_mode_constraint.score_mode[ac1, solution.get_mode(ac1)] score_ac2 = pair_mode_constraint.score_mode[ac2, solution.get_mode(ac2)] if score_ac1 != score_ac2: return False return True
[docs] def compute_details_mode_constraint( solution: RcpspSolution, pair_mode_constraint: PairModeConstraint ): list_constraints_not_respected: list[ tuple[str, Hashable, Hashable, Optional[int], Optional[int], int] ] = [] if pair_mode_constraint.allowed_mode_assignment is not None: for ac1, ac2 in pair_mode_constraint.allowed_mode_assignment: mode_ac1 = solution.get_mode(ac1) mode_ac2 = solution.get_mode(ac2) if (mode_ac1, mode_ac2) not in pair_mode_constraint.allowed_mode_assignment[ ac1, ac2 ]: list_constraints_not_respected.append( ("pair_mode_assignment", ac1, ac2, mode_ac1, mode_ac2, 100) ) return list_constraints_not_respected if pair_mode_constraint.same_score_mode is not None: for ac1, ac2 in pair_mode_constraint.same_score_mode: score_ac1 = pair_mode_constraint.score_mode[ac1, solution.get_mode(ac1)] score_ac2 = pair_mode_constraint.score_mode[ac2, solution.get_mode(ac2)] if score_ac1 != score_ac2: list_constraints_not_respected.append( ("pair_mode_score", ac1, ac2, score_ac1, score_ac2, 100) ) return list_constraints_not_respected