import logging
from abc import abstractmethod
from collections.abc import Callable
from typing import Any, Optional, Union
import numpy as np
try:
import gurobipy
except ImportError:
gurobi_available = False
Model = object
else:
gurobi_available = True
from gurobipy import Model
from scipy.optimize import minimize
from discrete_optimization.generic_tools.callbacks.callback import Callback
from discrete_optimization.generic_tools.do_problem import (
ParamsObjectiveFunction,
Problem,
Solution,
)
from discrete_optimization.generic_tools.do_solver import SolverDO
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
CategoricalHyperparameter,
FloatHyperparameter,
IntegerHyperparameter,
)
from discrete_optimization.generic_tools.hyperparameters.hyperparametrizable import (
Hyperparametrizable,
)
from discrete_optimization.generic_tools.lp_tools import GurobiMilpSolver
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
)
logger = logging.getLogger(__name__)
try:
from qiskit import QuantumCircuit
from qiskit.circuit.library import EfficientSU2, QAOAAnsatz
from qiskit.quantum_info.operators.base_operator import BaseOperator
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager
from qiskit_aer import AerSimulator
from qiskit_algorithms.utils import validate_bounds, validate_initial_point
from qiskit_ibm_runtime import EstimatorV2 as Estimator
from qiskit_ibm_runtime import SamplerV2, Session
from qiskit_optimization import QuadraticProgram
from qiskit_optimization.converters import QuadraticProgramToQubo
from qiskit_optimization.translators import from_gurobipy
except ImportError:
qiskit_available = False
msg = (
"QiskitQaoaSolver and QiskitVqeSolver need qiskit, qiskit_aer, qiskit_algorithms, qiskit_ibm_runtime, "
"and qiskit_optimization to be installed. "
"You can use the command `pip install discrete-optimization[quantum]` to install them."
)
logger.warning(msg)
class QiskitQaoaSolver(SolverDO):
def __init__(
self,
problem: Problem,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
backend: Optional = None,
**kwargs: Any,
):
raise RuntimeError(msg)
class QiskitVqeSolver(SolverDO):
def __init__(
self,
problem: Problem,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
backend: Optional = None,
**kwargs: Any,
):
raise RuntimeError(msg)
QuadraticProgram = object
else:
qiskit_available = True
[docs]
def get_result_from_dict_result(dict_result: dict[(str, int)]) -> np.ndarray:
"""
@param dict_result: dictionnary where keys are qubit's value and values are the number of time where this qubit's value have been chosen
@return: the qubit's value the must often chose
"""
m = 0
k = None
for key, value in dict_result.items():
if value > m:
m = value
k = key
res = list(k)
res.reverse()
res = [int(x) for x in res]
return np.array(res)
[docs]
def cost_func(params, ansatz, hamiltonian, estimator, callback_dict):
"""Return estimate of energy from estimator
Parameters:
params (ndarray): Array of ansatz parameters
ansatz (QuantumCircuit): Parameterized ansatz circuit
hamiltonian (SparsePauliOp): Operator representation of Hamiltonian
estimator (EstimatorV2): Estimator primitive instance
callback_dict: dictionary for storing intermediate results
Returns:
float: Energy estimate
"""
pub = (ansatz, [hamiltonian], [params])
result = estimator.run(pubs=[pub]).result()
cost = result[0].data.evs[0]
callback_dict["iters"] += 1
callback_dict["prev_vector"] = params
callback_dict["cost_history"].append(cost)
print(f"Iters. done: {callback_dict['iters']} [Current cost: {cost}]")
return cost
[docs]
def execute_ansatz_with_Hamiltonian(
solver, backend, ansatz, hamiltonian, use_session: Optional[bool] = False, **kwargs
) -> np.ndarray:
"""
@param solver: the solver who solve the problem, must be a QiskitSolver
@param backend: the backend use to run the circuit (simulator or real device)
@param ansatz: the quantum circuit
@param hamiltonian: the hamiltonian corresponding to the problem
@param use_session: boolean to set to True for use a session
@param kwargs: a list of hyperparameters who can be specified
@return: the qubit's value the must often chose
"""
# If no backend is given we use a quantum simulator
if backend is None:
backend = AerSimulator()
optimization_level = kwargs["optimization_level"]
nb_shots = kwargs["nb_shots"]
# transpile and optimize the quantum circuit depending on the device who are going to use
# there are four level_optimization, to 0 to 3, 3 is the better but also the longest
target = backend.target
pm = generate_preset_pass_manager(
target=target, optimization_level=optimization_level
)
new_ansatz = pm.run(ansatz)
solver.set_executed_ansatz(new_ansatz)
hamiltonian = hamiltonian.apply_layout(new_ansatz.layout)
# open a session if desired, only useful for real device
if use_session:
max_time = kwargs.get("session_time", "2h")
session = Session(backend=backend, max_time=max_time)
mode = session
else:
mode = backend
# Configure estimator
estimator = Estimator(mode=mode)
estimator.options.default_shots = nb_shots
# Configure sampler
sampler = SamplerV2(mode=mode)
sampler.options.default_shots = nb_shots
callback_dict = {
"prev_vector": None,
"iters": 0,
"cost_history": [],
}
# step of minimization
if kwargs.get("optimizer"):
def fun(x):
pub = (new_ansatz, [hamiltonian], [x])
result = estimator.run(pubs=[pub]).result()
cost = result[0].data.evs[0]
callback_dict["iters"] += 1
callback_dict["prev_vector"] = x
callback_dict["cost_history"].append(cost)
print(f"Iters. done: {callback_dict['iters']} [Current cost: {cost}]")
return cost
optimizer = kwargs["optimizer"]
res = optimizer.minimize(
fun,
validate_initial_point(point=None, circuit=ansatz),
bounds=validate_bounds(ansatz),
)
else:
method = kwargs["method"]
if kwargs.get("options"):
options = kwargs["options"]
else:
if method == "COBYLA":
options = {
"maxiter": kwargs["maxiter"],
"rhobeg": kwargs["rhobeg"],
"tol": kwargs["tol"],
}
else:
options = {}
res = minimize(
cost_func,
validate_initial_point(point=None, circuit=ansatz),
args=(new_ansatz, hamiltonian, estimator, callback_dict),
method=method,
bounds=validate_bounds(ansatz),
options=options,
)
# Assign solution parameters to our ansatz
qc = new_ansatz.assign_parameters(res.x)
solver.set_final_ansatz((qc.copy()))
# Add measurements to our circuit
qc.measure_all()
# transpile our circuit
qc = pm.run(qc)
# run our circuit with optimal parameters find at the minimization step
results = sampler.run([qc]).result()
# extract a dictionnary of results, key is binary values of variable and value is number of time of these values has been found
best_result = get_result_from_dict_result(results[0].data.meas.get_counts())
# Close the session since we are now done with it
if use_session:
session.close()
return best_result
[docs]
class QiskitSolver(SolverDO):
def __init__(
self,
problem: Problem,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
**kwargs,
):
super().__init__(problem, params_objective_function, **kwargs)
self.executed_ansatz = None
self.final_ansatz = None
[docs]
def set_executed_ansatz(self, ansatz):
self.executed_ansatz = ansatz
[docs]
def set_final_ansatz(self, ansatz):
self.final_ansatz = ansatz
[docs]
class QiskitQaoaSolver(QiskitSolver, Hyperparametrizable):
hyperparameters = [
IntegerHyperparameter(name="reps", low=1, high=6, default=2),
IntegerHyperparameter(name="optimization_level", low=0, high=3, default=1),
CategoricalHyperparameter(name="method", choices=["COBYLA"], default="COBYLA"),
IntegerHyperparameter(
name="nb_shots", low=10000, high=100000, step=10000, default=10000
),
IntegerHyperparameter(name="maxiter", low=100, high=1000, step=50, default=300),
FloatHyperparameter(name="rhobeg", low=0.5, high=1.5, default=1.0),
CategoricalHyperparameter(name="tol", choices=[1e-1, 1e-2, 1e-3], default=1e-2),
]
def __init__(
self,
problem: Problem,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
backend: Optional = None,
**kwargs,
):
super().__init__(problem, params_objective_function, **kwargs)
self.quadratic_programm = None
self.backend = backend
self.ansatz = None
[docs]
def solve(
self,
callbacks: Optional[list[Callback]] = None,
backend: Optional = None,
use_session: Optional[bool] = False,
**kwargs: Any,
) -> ResultStorage:
kwargs = self.complete_with_default_hyperparameters(kwargs)
reps = kwargs["reps"]
if backend is not None:
self.backend = backend
if self.quadratic_programm is None:
self.init_model()
if self.quadratic_programm is None:
raise RuntimeError(
"self.quadratic_programm must not be None after self.init_model()."
)
# if needed, in the most case the self.quadratic_programm is already in a QUBO form
conv = QuadraticProgramToQubo()
qubo = conv.convert(self.quadratic_programm)
hamiltonian, offset = qubo.to_ising()
mixer_operator = None
if kwargs.get("mixer_operator"):
mixer_operator = kwargs["mixer_operator"]
if not (
isinstance(mixer_operator, QuantumCircuit)
or isinstance(mixer_operator, BaseOperator)
):
raise RuntimeError(
"your personnalized mixer_operator must be a QuantumCircuit or a BaseOperator Object"
)
initial_state = None
if kwargs.get("initial_state"):
initial_state = kwargs["initial_state"]
if not isinstance(initial_state, QuantumCircuit):
raise RuntimeError("the initial_state must be a QuantumCircuit Object")
self.ansatz = QAOAAnsatz(
hamiltonian,
reps=reps,
mixer_operator=mixer_operator,
initial_state=initial_state,
)
if kwargs.get("parameter_bounds"):
if len(kwargs["parameter_bounds"]) != 2 * reps:
raise RuntimeError(
"For custom parameters of the quantum circuit you need to pass 2*reps bounds"
)
self.ansatz.parameter_bounds = kwargs["parameter_bounds"]
else:
"""
by default only hyperparameters of the mixer operator are initialized
but for some optimizer we need to initialize also hyperparameters of the cost operator
"""
bounds = []
for hp in self.ansatz.parameter_bounds:
if hp == (None, None):
hp = (0, np.pi)
bounds.append(hp)
self.ansatz.parameter_bounds = bounds
result = execute_ansatz_with_Hamiltonian(
self, self.backend, self.ansatz, hamiltonian, use_session, **kwargs
)
result = conv.interpret(result)
sol = self.retrieve_current_solution(result)
fit = self.aggreg_from_sol(sol)
return self.create_result_storage(
[(sol, fit)],
)
[docs]
@abstractmethod
def init_model(self, **kwargs: any) -> None:
...
[docs]
@abstractmethod
def retrieve_current_solution(self, result) -> Solution:
"""Retrieve current solution from qiskit result.
Args:
result: list of value for each binary variable of the problem
Returns:
the converted solution at d-o format
"""
...
[docs]
class QiskitVqeSolver(QiskitSolver):
hyperparameters = [
IntegerHyperparameter(name="reps", low=1, high=6, default=3),
IntegerHyperparameter(name="optimization_level", low=0, high=3, default=1),
CategoricalHyperparameter(name="method", choices=["COBYLA"], default="COBYLA"),
IntegerHyperparameter(
name="nb_shots", low=10000, high=100000, step=10000, default=10000
),
IntegerHyperparameter(name="maxiter", low=100, high=2000, step=50, default=300),
FloatHyperparameter(name="rhobeg", low=0.5, high=1.5, default=1.0),
CategoricalHyperparameter(name="tol", choices=[1e-1, 1e-2, 1e-3], default=1e-2),
]
def __init__(
self,
problem: Problem,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
backend: Optional = None,
**kwargs,
):
super().__init__(problem, params_objective_function, **kwargs)
self.quadratic_programm = None
self.backend = backend
self.ansatz = None
[docs]
def solve(
self,
callbacks: Optional[list[Callback]] = None,
backend: Optional = None,
use_session: Optional[bool] = False,
**kwargs: Any,
) -> ResultStorage:
kwargs = self.complete_with_default_hyperparameters(kwargs)
if backend is not None:
self.backend = backend
if self.quadratic_programm is None:
self.init_model()
if self.quadratic_programm is None:
raise RuntimeError(
"self.quadratic_programm must not be None after self.init_model()."
)
# if needed, in the most case the self.quadratic_programm is already in a QUBO form
conv = QuadraticProgramToQubo()
qubo = conv.convert(self.quadratic_programm)
hamiltonian, offset = qubo.to_ising()
if kwargs.get("personnalized_ansatz"):
self.ansatz = kwargs["personnalized_ansatz"]
if self.ansatz.num_qubits != hamiltonian.num_qubits:
raise RuntimeError(
"your personnalized ansatz must have the same number of qubits that the number of binary variable of the problem"
)
if not isinstance(self.ansatz, QuantumCircuit):
raise RuntimeError(
"your personnalized ansatz must be a QuantumCircuit Object"
)
else:
self.ansatz = EfficientSU2(hamiltonian.num_qubits, reps=kwargs["reps"])
result = execute_ansatz_with_Hamiltonian(
self, self.backend, self.ansatz, hamiltonian, use_session, **kwargs
)
result = conv.interpret(result)
sol = self.retrieve_current_solution(result)
fit = self.aggreg_from_sol(sol)
return self.create_result_storage(
[(sol, fit)],
)
[docs]
@abstractmethod
def init_model(self, **kwargs: Any) -> None:
...
[docs]
@abstractmethod
def retrieve_current_solution(self, result) -> Solution:
"""Retrieve current solution from qiskit result.
Args:
result: list of value for each binary variable of the problem
Returns:
the converted solution at d-o format
"""
...
[docs]
def gurobi_to_qubo(model):
return from_gurobipy(model)
[docs]
class GeneralQaoaSolver(QiskitQaoaSolver):
def __init__(
self,
problem: Problem,
model: Union[Model, GurobiMilpSolver],
retrieve_solution: Callable,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
backend: Optional = None,
**kwargs,
):
super().__init__(problem, params_objective_function, backend=backend)
self.quadratic_programm = None
self.model = model
self.retrieve_solution = retrieve_solution
[docs]
def retrieve_current_solution(self, result: np.ndarray) -> Solution:
return self.retrieve_solution(result)
[docs]
def init_model(self, **kwargs: Any) -> None:
if isinstance(self.model, Model):
self.quadratic_programm = gurobi_to_qubo(self.model)
else:
self.model.init_model(**kwargs)
self.quadratic_programm = gurobi_to_qubo(self.model.model)
[docs]
class GeneralVqeSolver(QiskitVqeSolver):
def __init__(
self,
problem: Problem,
model: Union[Model, GurobiMilpSolver],
retrieve_solution: Callable,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
backend: Optional = None,
**kwargs,
):
super().__init__(problem, params_objective_function, backend=backend, **kwargs)
self.quadratic_programm = None
self.model = model
self.retrieve_solution = retrieve_solution
[docs]
def retrieve_current_solution(self, result) -> Solution:
return self.retrieve_solution(result)
[docs]
def init_model(self, **kwargs: Any) -> None:
if isinstance(self.model, Model):
self.quadratic_programm = gurobi_to_qubo(self.model)
else:
self.model.init_model(**kwargs)
self.quadratic_programm = gurobi_to_qubo(self.model.model)
[docs]
def matrix(quad: QuadraticProgram):
"""
@param quad: a quadratic programm, must be in QUBO form
@return: the QUBO matrix
"""
num_var = quad.get_num_vars()
m = np.zeros((num_var, num_var))
obj = quad.objective.quadratic.to_dict()
for key, val in obj.items():
m[key[0], key[1]] = val
return m
[docs]
def compute_energy(matrix, x):
"""
@param matrix: a matrix of a QUBO formulation
@param x: a binary vector
@return: the value of the matrix for the giving vector
"""
energy = 0
for i in range(0, len(x)):
for j in range(i, len(x)):
if x[i] == 1 and x[j] == 1:
energy += matrix[i][j]
return energy