# Copyright (c) 2025 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
from typing import Any, Callable, Dict
from ortools.math_opt.python import mathopt
from discrete_optimization.generic_tools.do_problem import Solution
from discrete_optimization.generic_tools.lp_tools import (
GurobiMilpSolver,
InequalitySense,
MilpSolver,
OrtoolsMathOptMilpSolver,
)
from discrete_optimization.singlemachine.problem import (
WeightedTardinessProblem,
WTSolution,
)
try:
import gurobipy
except:
pass
logger = logging.getLogger(__name__)
class _BaseLpSingleMachineSolver(MilpSolver):
"""
LP solver for the Single Machine Weighted Tardiness Problem.
This solver uses a relative ordering formulation with 'big-M' constraints
to ensure non-overlapping schedules.
"""
def __init__(self, problem: WeightedTardinessProblem, **kwargs: Any):
super().__init__(problem=problem, **kwargs)
self.problem: WeightedTardinessProblem = problem
self.variables: Dict[str, Any] = {}
def init_model(self, **kwargs: Any) -> None:
"""
Builds the LP model from the WeightedTardinessProblem instance.
It defines variables, constraints, and the objective function.
"""
# Create a new empty model
self.model = self.create_empty_model()
# Problem Data
num_jobs = self.problem.num_jobs
p = self.problem.processing_times
w = self.problem.weights
d = self.problem.due_dates
r = self.problem.release_dates
# --- Define Variables ---
# x_ij = 1 if job i is before job j
x = {
(i, j): self.add_binary_variable(name=f"x_{i},{j}")
for i in range(num_jobs)
for j in range(num_jobs)
if i != j
}
# C_i = completion time of job i
c = {
i: self.add_continuous_variable(lb=r[i] + p[i], name=f"C_{i}")
for i in range(num_jobs)
}
# T_i = tardiness of job i
t = {
i: self.add_continuous_variable(lb=0, name=f"T_{i}")
for i in range(num_jobs)
}
self.variables = {"x": x, "c": c, "t": t}
big_m = sum(p) + max(r)
# Ordering Constraints
for i in range(num_jobs):
for j in range(i + 1, num_jobs):
self.add_linear_constraint(x[(i, j)] + x[(j, i)] == 1)
# Completion time constraints (Big-M for non-overlap)
for i in range(num_jobs):
for j in range(num_jobs):
if i == j:
continue
self.add_linear_constraint_with_indicator(
x[(i, j)],
1,
c[j],
sense=InequalitySense.GREATER_OR_EQUAL,
rhs=c[i] + p[j],
penalty_coeff=big_m,
)
# Tardiness definition: T_i >= C_i - d_i
for i in range(num_jobs):
self.add_linear_constraint(t[i] >= c[i] - d[i])
# 5. Transitivity constraints: x_ij + x_jk + x_ki <= 2
for i in range(num_jobs):
for j in range(num_jobs):
if i == j:
continue
for k in range(num_jobs):
if k in {i, j}:
continue
self.add_linear_constraint(x[(i, j)] + x[(j, k)] + x[(k, i)] <= 2)
# --- Define Objective Function ---
objective = self.construct_linear_sum([w[i] * t[i] for i in range(num_jobs)])
self.set_model_objective(objective, minimize=True)
logger.info("LP model initialized.")
def retrieve_current_solution(
self, get_var_value_for_current_solution: Callable[[Any], float], **kwargs
) -> Solution:
"""
Retrieves a WTSolution from the LP solver's results.
Args:
get_var_value_for_current_solution: A function provided by the
MilpSolver framework to query the value of a model variable.
Returns:
A WTSolution object representing the schedule.
:param **kwargs:
"""
schedule = [() for _ in range(self.problem.num_jobs)]
for i in range(self.problem.num_jobs):
completion_time = get_var_value_for_current_solution(self.variables["c"][i])
start_time = completion_time - self.problem.processing_times[i]
schedule[i] = (int(round(start_time)), int(round(completion_time)))
return WTSolution(problem=self.problem, schedule=schedule)
[docs]
class MathOptSingleMachineSolver(_BaseLpSingleMachineSolver, OrtoolsMathOptMilpSolver):
[docs]
def convert_to_variable_values(
self, solution: Solution
) -> dict[mathopt.Variable, float]:
pass
[docs]
class GurobiSingleMachineSolver(_BaseLpSingleMachineSolver, GurobiMilpSolver):
[docs]
def convert_to_variable_values(
self, solution: Solution
) -> dict["gurobipy.Var", float]:
pass