Source code for discrete_optimization.generic_tools.transformation.transformation_solver
# Copyright (c) 2026 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""Solver wrapper that applies problem transformation before solving."""
from __future__ import annotations
from typing import Any, Optional
from discrete_optimization.generic_tools.callbacks.callback import Callback
from discrete_optimization.generic_tools.do_problem import (
ParamsObjectiveFunction,
Problem,
Solution,
)
from discrete_optimization.generic_tools.do_solver import SolverDO, WarmstartMixin
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import SubBrick
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
)
from discrete_optimization.generic_tools.transformation.problem_transformation import (
ProblemTransformation,
)
[docs]
class TransformationSolver(SolverDO, WarmstartMixin):
"""Solver that applies problem transformation before solving.
This wrapper:
1. Transforms P1 → P2 using transformation (or uses pre-computed target_problem)
2. Instantiates and solves P2 using solver_brick
3. Back-transforms solutions S2 → S1
4. Stores solutions as S1 in ResultStorage
Example (simple):
# >>> transformation = RcpspToMultiskillTransformation()
# >>> solver = TransformationSolver(
# ... transformation=transformation,
# ... source_problem=rcpsp_problem,
# ... solver_brick=SubBrick(
# ... cls=CPSatMultiskillSolver,
# ... kwargs={"time_limit": 60}
# ... )
# ... )
# >>> result = solver.solve() # Returns RcpspSolution!
Example (with pre-computed target problem):
# >>> transformation = RcpspToMultiskillTransformation()
# >>> target_problem = transformation.transform_problem(rcpsp_problem)
# >>> solver = TransformationSolver(
# ... transformation=transformation,
# ... source_problem=rcpsp_problem,
# ... target_problem=target_problem, # Already transformed!
# ... solver_brick=SubBrick(...)
# ... )
Example (for use in SequentialMetasolver):
# >>> # TransformationSolver also accepts 'problem' arg for SolverDO compatibility
# >>> solver = TransformationSolver(
# ... problem=rcpsp_problem, # Used as source_problem
# ... transformation=RcpspToMultiskillTransformation(),
# ... solver_brick=SubBrick(...)
# ... )
Note: Callbacks currently receive transformed (target) solutions.
Future enhancement will back-transform solutions for callbacks.
"""
transformation: ProblemTransformation
source_problem: Problem
target_problem: Problem
wrapped_solver: SolverDO
_solution_cache: dict[int, Solution] # Cache for back-transformed solutions
def __init__(
self,
transformation: ProblemTransformation,
solver_brick: SubBrick,
source_problem: Optional[Problem] = None,
target_problem: Optional[Problem] = None,
problem: Optional[Problem] = None, # Alias for source_problem (SolverDO compat)
params_objective_function: Optional[ParamsObjectiveFunction] = None,
**kwargs: Any,
):
"""Initialize transformation solver.
Args:
transformation: Transformation to apply (stateless)
solver_brick: SubBrick defining solver class + kwargs for target problem
source_problem: Source problem (P1) - provide this OR problem
target_problem: Target problem (P2) - optional, computed if not provided
problem: Alias for source_problem (for SolverDO/SequentialMetasolver compatibility)
params_objective_function: Objective function params for SOURCE problem (P1)
**kwargs: Additional arguments
Raises:
ValueError: If source_problem not provided
"""
# Determine source problem (accept either source_problem or problem)
if source_problem is not None and problem is not None:
if source_problem != problem:
raise ValueError(
"source_problem and problem are different. Provide only one."
)
self.source_problem = source_problem
elif source_problem is not None:
self.source_problem = source_problem
elif problem is not None:
self.source_problem = problem
else:
raise ValueError("Must provide either source_problem or problem")
# Initialize SolverDO with SOURCE problem
super().__init__(
problem=self.source_problem,
params_objective_function=params_objective_function,
**kwargs,
)
self.transformation = transformation
# Get or create target problem
if target_problem is not None:
self.target_problem = target_problem
else:
self.target_problem = self.transformation.transform_problem(
self.source_problem
)
# Instantiate solver for target problem
self.solver_brick = solver_brick
self.wrapped_solver = solver_brick.cls(
problem=self.target_problem, **solver_brick.kwargs
)
# Cache for solution transformations (S2 id -> S1)
self._solution_cache = {}
[docs]
def init_model(self, **kwargs: Any) -> None:
"""Initialize the wrapped solver's model.
Args:
**kwargs: Arguments passed to wrapped solver's init_model
"""
self.wrapped_solver.init_model(**kwargs)
[docs]
def solve(
self, callbacks: Optional[list[Callback]] = None, **kwargs: Any
) -> ResultStorage:
"""Solve by transforming, solving, and back-transforming.
Args:
callbacks: Callbacks (currently receive target problem solutions)
**kwargs: Arguments passed to wrapped solver
Returns:
ResultStorage containing SOURCE problem solutions (S1)
"""
# TODO: Wrap callbacks to back-transform solutions
# For now, callbacks receive target solutions
# Solve in transformed space
merged_kwargs = kwargs.copy()
merged_kwargs.update(self.solver_brick.kwargs)
target_result = self.wrapped_solver.solve(callbacks=callbacks, **merged_kwargs)
# Back-transform all solutions
source_result = self.create_result_storage()
for solution_target, _ in target_result:
solution_source = self._get_or_transform_solution(solution_target)
# Re-evaluate in source problem to ensure consistency
source_result.append(
(solution_source, self.aggreg_from_sol(solution_source))
)
# Update status
self.status_solver = self.wrapped_solver.status_solver
# Clear cache
self._solution_cache.clear()
return source_result
def _get_or_transform_solution(self, solution_target: Solution) -> Solution:
"""Get transformed solution from cache or transform and cache it.
This avoids re-transforming the same solution multiple times.
Args:
solution_target: Solution in target problem space
Returns:
Solution in source problem space
"""
sol_id = id(solution_target)
if sol_id not in self._solution_cache:
self._solution_cache[sol_id] = self.transformation.back_transform_solution(
solution_target, self.source_problem
)
return self._solution_cache[sol_id]
[docs]
def set_warm_start(self, solution: Solution) -> None:
"""Set warmstart solution from SOURCE problem.
Args:
solution: Solution in source problem space (S1)
Raises:
ValueError: If transformation doesn't support forward transformation
or wrapped solver doesn't support warmstart
"""
if not isinstance(self.wrapped_solver, WarmstartMixin):
raise ValueError("Wrapped solver does not support warmstart")
# Forward-transform solution
target_solution = self.transformation.forward_transform_solution(
solution, self.target_problem
)
if target_solution is None:
raise ValueError(
f"Transformation {type(self.transformation).__name__} "
"does not support forward transformation (warmstart not available)"
)
self.wrapped_solver.set_warm_start(target_solution)
def __repr__(self) -> str:
"""String representation."""
return (
f"TransformationSolver({self.transformation}, "
f"solver={type(self.wrapped_solver).__name__})"
)