# This code is part of a Qiskit project.
#
# (C) Copyright IBM 2019, 2024.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Analytical Quantum Gradient Descent (AQGD) optimizer."""
from __future__ import annotations
import logging
from collections.abc import Callable
from typing import Any
import numpy as np
from ..utils.validation import validate_range_exclusive_max
from .optimizer import Optimizer, OptimizerSupportLevel, OptimizerResult, POINT
from ..exceptions import AlgorithmError
logger = logging.getLogger(__name__)
[docs]
class AQGD(Optimizer):
"""Analytic Quantum Gradient Descent (AQGD) with Epochs optimizer.
Performs gradient descent optimization with a momentum term, analytic gradients,
and customized step length schedule for parameterized quantum gates, i.e.
Pauli Rotations. See, for example:
* K. Mitarai, M. Negoro, M. Kitagawa, and K. Fujii. (2018).
Quantum circuit learning. Phys. Rev. A 98, 032309.
https://arxiv.org/abs/1803.00745
* Maria Schuld, Ville Bergholm, Christian Gogolin, Josh Izaac, Nathan Killoran. (2019).
Evaluating analytic gradients on quantum hardware. Phys. Rev. A 99, 032331.
https://arxiv.org/abs/1811.11184
for further details on analytic gradients of parameterized quantum gates.
Gradients are computed "analytically" using the quantum circuit when evaluating
the objective function.
"""
_OPTIONS = ["maxiter", "eta", "tol", "disp", "momentum", "param_tol", "averaging"]
# pylint: disable=too-many-positional-arguments
def __init__(
self,
maxiter: int | list[int] = 1000,
eta: float | list[float] = 1.0,
tol: float = 1e-6, # this is tol
momentum: float | list[float] = 0.25,
param_tol: float = 1e-6,
averaging: int = 10,
max_evals_grouped: int = 1,
) -> None:
"""
Performs Analytical Quantum Gradient Descent (AQGD) with Epochs.
Args:
maxiter: Maximum number of iterations (full gradient steps)
eta: The coefficient of the gradient update. Increasing this value
results in larger step sizes: param = previous_param - eta * deriv
tol: Tolerance for change in windowed average of objective values.
Convergence occurs when either objective tolerance is met OR parameter
tolerance is met.
momentum: Bias towards the previous gradient momentum in current
update. Must be within the bounds: [0,1)
param_tol: Tolerance for change in norm of parameters.
averaging: Length of window over which to average objective values for objective
convergence criterion
max_evals_grouped: Max number of default gradient evaluations performed simultaneously.
Raises:
AlgorithmError: If the length of ``maxiter``, `momentum``, and ``eta`` is not the same.
"""
super().__init__()
if isinstance(maxiter, int):
maxiter = [maxiter]
if isinstance(eta, (int, float)):
eta = [eta]
if isinstance(momentum, (int, float)):
momentum = [momentum]
if len(maxiter) != len(eta) or len(maxiter) != len(momentum):
raise AlgorithmError(
"AQGD input parameter length mismatch. Parameters `maxiter`, "
"`eta`, and `momentum` must have the same length."
)
for m in momentum:
validate_range_exclusive_max("momentum", m, 0, 1)
self._eta = eta
self._maxiter = maxiter
self._momenta_coeff = momentum
self._param_tol = param_tol
self._tol = tol
self._averaging = averaging
self.set_max_evals_grouped(max_evals_grouped)
# state
self._avg_objval: float | None = None
self._prev_param: np.ndarray | None = None
self._eval_count = 0 # function evaluations
self._prev_loss: list[float] = []
self._prev_grad: list[list[float]] = []
[docs]
def get_support_level(self) -> dict[str, OptimizerSupportLevel]:
"""Support level dictionary
Returns:
Dict[str, int]: gradient, bounds and initial point
support information that is ignored/required.
"""
return {
"gradient": OptimizerSupportLevel.ignored,
"bounds": OptimizerSupportLevel.ignored,
"initial_point": OptimizerSupportLevel.required,
}
@property
def settings(self) -> dict[str, Any]:
return {
"maxiter": self._maxiter,
"eta": self._eta,
"momentum": self._momenta_coeff,
"param_tol": self._param_tol,
"tol": self._tol,
"averaging": self._averaging,
}
def _compute_objective_fn_and_gradient(
self, params: np.ndarray | list[float], obj: Callable
) -> tuple[float, np.ndarray]:
"""
Obtains the objective function value for params and the analytical quantum derivatives of
the objective function with respect to each parameter. Requires
2*(number parameters) + 1 objective evaluations
Args:
params: Current value of the parameters to evaluate the objective function
obj: Objective function of interest
Returns:
Tuple containing the objective value and array of gradients for the given parameter set.
"""
num_params = len(params)
param_sets_to_eval = params + np.concatenate(
(
np.zeros((1, num_params)), # copy of the parameters as is
np.eye(num_params) * np.pi / 2, # copy of the parameters with the positive shift
-np.eye(num_params) * np.pi / 2,
), # copy of the parameters with the negative shift
axis=0,
)
# Evaluate,
# reshaping to flatten, as expected by objective function
if self._max_evals_grouped > 1:
batches = [
param_sets_to_eval[i : i + self._max_evals_grouped]
for i in range(0, len(param_sets_to_eval), self._max_evals_grouped)
]
values = np.array(np.concatenate([obj(b) for b in batches]))
else:
batches = param_sets_to_eval
values = np.array([obj(b) for b in batches])
# Update number of objective function evaluations
self._eval_count += 2 * num_params + 1
# return the objective function value
obj_value = values[0]
# return the gradient values
gradient = 0.5 * (values[1 : num_params + 1] - values[1 + num_params :])
return obj_value, gradient
# pylint: disable=too-many-positional-arguments
def _update(
self,
params: np.ndarray,
gradient: np.ndarray,
mprev: np.ndarray,
step_size: float,
momentum_coeff: float,
) -> tuple[np.ndarray, np.ndarray]:
"""
Updates full parameter array based on a step that is a convex
combination of the gradient and previous momentum
Args:
params: Current value of the parameters to evaluate the objective function at
gradient: Gradient of objective wrt parameters
mprev: Momentum vector for each parameter
step_size: The scaling of step to take
momentum_coeff: Bias towards previous momentum vector when updating current
momentum/step vector
Returns:
Tuple of the updated parameter and momentum vectors respectively.
"""
# Momentum update:
# Convex combination of previous momentum and current gradient estimate
mnew = (1 - momentum_coeff) * gradient + momentum_coeff * mprev
params -= step_size * mnew
return params, mnew
def _converged_objective(self, objval: float, tol: float, window_size: int) -> bool:
"""
Tests convergence based on the change in a moving windowed average of past objective values
Args:
objval: Current value of the objective function
tol: tolerance below which (average) objective function change must be
window_size: size of averaging window
Returns:
Bool indicating whether or not the optimization has converged.
"""
# If we haven't reached the required window length,
# append the current value, but we haven't converged
if len(self._prev_loss) < window_size:
self._prev_loss.append(objval)
return False
# Update last value in list with current value
self._prev_loss.append(objval)
# (length now = n+1)
# Calculate previous windowed average
# and current windowed average of objective values
prev_avg = np.mean(self._prev_loss[:window_size])
curr_avg = np.mean(self._prev_loss[1 : window_size + 1])
self._avg_objval = curr_avg # type: ignore[assignment]
# Update window of objective values
# (Remove earliest value)
self._prev_loss.pop(0)
if np.absolute(prev_avg - curr_avg) < tol:
# converged
logger.info("Previous obj avg: %f\nCurr obj avg: %f", prev_avg, curr_avg)
return True
return False
def _converged_parameter(self, parameter: np.ndarray, tol: float) -> bool:
"""
Tests convergence based on change in parameter
Args:
parameter: current parameter values
tol: tolerance for change in norm of parameters
Returns:
Bool indicating whether or not the optimization has converged
"""
if self._prev_param is None:
self._prev_param = np.copy(parameter)
return False
order = np.inf
p_change = np.linalg.norm(self._prev_param - parameter, ord=order)
if p_change < tol:
# converged
logger.info("Change in parameters (%f norm): %f", order, p_change)
return True
return False
def _converged_alt(self, gradient: list[float], tol: float, window_size: int) -> bool:
"""
Tests convergence from norm of windowed average of gradients
Args:
gradient: current gradient
tol: tolerance for average gradient norm
window_size: size of averaging window
Returns:
Bool indicating whether or not the optimization has converged
"""
# If we haven't reached the required window length,
# append the current value, but we haven't converged
if len(self._prev_grad) < window_size - 1:
self._prev_grad.append(gradient)
return False
# Update last value in list with current value
self._prev_grad.append(gradient)
# (length now = n)
# Calculate previous windowed average
# and current windowed average of objective values
avg_grad = np.mean(self._prev_grad, axis=0)
# Update window of values
# (Remove earliest value)
self._prev_grad.pop(0)
if np.linalg.norm(avg_grad, ord=np.inf) < tol:
# converged
logger.info("Avg. grad. norm: %f", np.linalg.norm(avg_grad, ord=np.inf))
return True
return False
[docs]
def minimize(
self,
fun: Callable[[POINT], float],
x0: POINT,
jac: Callable[[POINT], POINT] | None = None,
bounds: list[tuple[float, float]] | None = None,
) -> OptimizerResult:
params = np.asarray(x0)
momentum = np.zeros(shape=(params.size,))
# empty out history of previous objectives/gradients/parameters
# (in case this object is re-used)
self._prev_loss = []
self._prev_grad = []
self._prev_param = None
self._eval_count = 0 # function evaluations
iter_count = 0
logger.info("Initial Params: %s", params)
epoch = 0
converged = False
for eta, mom_coeff in zip(self._eta, self._momenta_coeff):
logger.info("Epoch: %4d | Stepsize: %6.4f | Momentum: %6.4f", epoch, eta, mom_coeff)
sum_max_iters = sum(self._maxiter[0 : epoch + 1])
while iter_count < sum_max_iters:
# update the iteration count
iter_count += 1
# Check for parameter convergence before potentially costly function evaluation
converged = self._converged_parameter(params, self._param_tol)
if converged:
break
# Calculate objective function and estimate of analytical gradient
if jac is None:
objval, gradient = self._compute_objective_fn_and_gradient(params, fun)
else:
objval = fun(params)
gradient = jac(params) # type: ignore[assignment]
logger.info(
" Iter: %4d | Obj: %11.6f | Grad Norm: %f",
iter_count,
objval,
np.linalg.norm(gradient, ord=np.inf),
)
# Check for objective convergence
converged = self._converged_objective(objval, self._tol, self._averaging)
if converged:
break
# Update parameters and momentum
params, momentum = self._update(params, gradient, momentum, eta, mom_coeff)
# end inner iteration
# if converged, end iterating over epochs
if converged:
break
epoch += 1
# end epoch iteration
result = OptimizerResult()
result.x = params
result.fun = objval
result.nfev = self._eval_count
result.nit = iter_count
return result