Source code for qiskit_algorithms.optimizers.spsa

# This code is part of a Qiskit project.
#
# (C) Copyright IBM 2018, 2024.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Simultaneous Perturbation Stochastic Approximation (SPSA) optimizer.

This implementation allows both standard first-order and second-order SPSA.
"""
from __future__ import annotations

from collections import deque
from collections.abc import Iterator
from typing import Callable, Any, SupportsFloat
import logging
import warnings
from time import time

import scipy
import numpy as np

from qiskit_algorithms.utils import algorithm_globals

from .optimizer import Optimizer, OptimizerSupportLevel, OptimizerResult, POINT

# number of function evaluations, parameters, loss, stepsize, accepted
CALLBACK = Callable[[int, np.ndarray, float, SupportsFloat, bool], None]
TERMINATIONCHECKER = Callable[[int, np.ndarray, float, SupportsFloat, bool], bool]

logger = logging.getLogger(__name__)


[docs]class SPSA(Optimizer): """Simultaneous Perturbation Stochastic Approximation (SPSA) optimizer. SPSA [1] is an gradient descent method for optimizing systems with multiple unknown parameters. As an optimization method, it is appropriately suited to large-scale population models, adaptive modeling, and simulation optimization. .. seealso:: Many examples are presented at the `SPSA Web site <http://www.jhuapl.edu/SPSA>`__. The main feature of SPSA is the stochastic gradient approximation, which requires only two measurements of the objective function, regardless of the dimension of the optimization problem. Additionally, to standard first-order SPSA, where only gradient information is used, this implementation also allows second-order SPSA (2-SPSA) [2]. In 2-SPSA we also estimate the Hessian of the loss with a stochastic approximation and multiply the gradient with the inverse Hessian to take local curvature into account and improve convergence. Notably this Hessian estimate requires only a constant number of function evaluations unlike an exact evaluation of the Hessian, which scales quadratically in the number of function evaluations. .. note:: SPSA can be used in the presence of noise, and it is therefore indicated in situations involving measurement uncertainty on a quantum computation when finding a minimum. If you are executing a variational algorithm using a Quantum ASseMbly Language (QASM) simulator or a real device, SPSA would be the most recommended choice among the optimizers provided here. The optimization process can include a calibration phase if neither the ``learning_rate`` nor ``perturbation`` is provided, which requires additional functional evaluations. (Note that either both or none must be set.) For further details on the automatic calibration, please refer to the supplementary information section IV. of [3]. .. note:: This component has some function that is normally random. If you want to reproduce behavior then you should set the random number generator seed in the algorithm_globals (``qiskit_algorithms.utils.algorithm_globals.random_seed = seed``). Examples: This short example runs SPSA for the ground state calculation of the ``Z ^ Z`` observable where the ansatz is a ``PauliTwoDesign`` circuit. .. code-block:: python import numpy as np from qiskit_algorithms.optimizers import SPSA from qiskit.circuit.library import PauliTwoDesign from qiskit.primitives import Estimator from qiskit.quantum_info import SparsePauliOp ansatz = PauliTwoDesign(2, reps=1, seed=2) observable = SparsePauliOp("ZZ") initial_point = np.random.random(ansatz.num_parameters) estimator = Estimator() def loss(x): job = estimator.run([ansatz], [observable], [x]) return job.result().values[0] spsa = SPSA(maxiter=300) result = spsa.minimize(loss, x0=initial_point) To use the Hessian information, i.e. 2-SPSA, you can add `second_order=True` to the initializer of the `SPSA` class, the rest of the code remains the same. .. code-block:: python two_spsa = SPSA(maxiter=300, second_order=True) result = two_spsa.minimize(loss, x0=initial_point) The `termination_checker` can be used to implement a custom termination criterion. .. code-block:: python import numpy as np from qiskit_algorithms.optimizers import SPSA def objective(x): return np.linalg.norm(x) + .04*np.random.rand(1) class TerminationChecker: def __init__(self, N : int): self.N = N self.values = [] def __call__(self, nfev, parameters, value, stepsize, accepted) -> bool: self.values.append(value) if len(self.values) > self.N: last_values = self.values[-self.N:] pp = np.polyfit(range(self.N), last_values, 1) slope = pp[0] / self.N if slope > 0: return True return False spsa = SPSA(maxiter=200, termination_checker=TerminationChecker(10)) result = spsa.minimize(objective, x0=[0.5, 0.5]) print(f'SPSA completed after {result.nit} iterations') References: [1]: J. C. Spall (1998). An Overview of the Simultaneous Perturbation Method for Efficient Optimization, Johns Hopkins APL Technical Digest, 19(4), 482–492. `Online at jhuapl.edu. <https://www.jhuapl.edu/SPSA/PDF-SPSA/Spall_An_Overview.PDF>`_ [2]: J. C. Spall (1997). Accelerated second-order stochastic optimization using only function measurements, Proceedings of the 36th IEEE Conference on Decision and Control, 1417-1424 vol.2. `Online at IEEE.org. <https://ieeexplore.ieee.org/document/657661>`_ [3]: A. Kandala et al. (2017). Hardware-efficient Variational Quantum Eigensolver for Small Molecules and Quantum Magnets. Nature 549, pages242–246(2017). `arXiv:1704.05018v2 <https://arxiv.org/pdf/1704.05018v2.pdf#section*.11>`_ """ # pylint: disable=too-many-positional-arguments def __init__( self, maxiter: int = 100, blocking: bool = False, allowed_increase: float | None = None, trust_region: bool = False, learning_rate: float | np.ndarray | Callable[[], Iterator] | None = None, perturbation: float | np.ndarray | Callable[[], Iterator] | None = None, last_avg: int = 1, resamplings: int | dict[int, int] = 1, perturbation_dims: int | None = None, second_order: bool = False, regularization: float | None = None, hessian_delay: int = 0, lse_solver: Callable[[np.ndarray, np.ndarray], np.ndarray] | None = None, initial_hessian: np.ndarray | None = None, callback: CALLBACK | None = None, termination_checker: TERMINATIONCHECKER | None = None, ) -> None: r""" Args: maxiter: The maximum number of iterations. Note that this is not the maximal number of function evaluations. blocking: If True, only accepts updates that improve the loss (up to some allowed increase, see next argument). allowed_increase: If ``blocking`` is ``True``, this argument determines by how much the loss can increase with the proposed parameters and still be accepted. If ``None``, the allowed increases is calibrated automatically to be twice the approximated standard deviation of the loss function. trust_region: If ``True``, restricts the norm of the update step to be :math:`\leq 1`. learning_rate: The update step is the learning rate is multiplied with the gradient. If the learning rate is a float, it remains constant over the course of the optimization. If a NumPy array, the :math:`i`-th element is the learning rate for the :math:`i`-th iteration. It can also be a callable returning an iterator which yields the learning rates for each optimization step. If ``learning_rate`` is set ``perturbation`` must also be provided. perturbation: Specifies the magnitude of the perturbation for the finite difference approximation of the gradients. See ``learning_rate`` for the supported types. If ``perturbation`` is set ``learning_rate`` must also be provided. last_avg: Return the average of the ``last_avg`` parameters instead of just the last parameter values. resamplings: The number of times the gradient (and Hessian) is sampled using a random direction to construct a gradient estimate. Per default the gradient is estimated using only one random direction. If an integer, all iterations use the same number of resamplings. If a dictionary, this is interpreted as ``{iteration: number of resamplings per iteration}``. perturbation_dims: The number of perturbed dimensions. Per default, all dimensions are perturbed, but a smaller, fixed number can be perturbed. If set, the perturbed dimensions are chosen uniformly at random. second_order: If True, use 2-SPSA instead of SPSA. In 2-SPSA, the Hessian is estimated additionally to the gradient, and the gradient is preconditioned with the inverse of the Hessian to improve convergence. regularization: To ensure the preconditioner is symmetric and positive definite, the identity times a small coefficient is added to it. This generator yields that coefficient. hessian_delay: Start multiplying the gradient with the inverse Hessian only after a certain number of iterations. The Hessian is still evaluated and therefore this argument can be useful to first get a stable average over the last iterations before using it as preconditioner. lse_solver: The method to solve for the inverse of the Hessian. Per default an exact LSE solver is used, but can e.g. be overwritten by a minimization routine. initial_hessian: The initial guess for the Hessian. By default the identity matrix is used. callback: A callback function passed information in each iteration step. The information is, in this order: the number of function evaluations, the parameters, the function value, the stepsize, whether the step was accepted. termination_checker: A callback function executed at the end of each iteration step. The arguments are, in this order: the parameters, the function value, the number of function evaluations, the stepsize, whether the step was accepted. If the callback returns True, the optimization is terminated. To prevent additional evaluations of the objective method, if the objective has not yet been evaluated, the objective is estimated by taking the mean of the objective evaluations used in the estimate of the gradient. Raises: ValueError: If ``learning_rate`` or ``perturbation`` is an array with less elements than the number of iterations. """ super().__init__() # general optimizer arguments self.maxiter = maxiter self.trust_region = trust_region self.callback = callback self.termination_checker = termination_checker # if learning rate and perturbation are arrays, check they are sufficiently long for attr, name in zip([learning_rate, perturbation], ["learning_rate", "perturbation"]): if isinstance(attr, (list, np.ndarray)): if len(attr) < maxiter: raise ValueError(f"Length of {name} is smaller than maxiter ({maxiter}).") self.learning_rate = learning_rate self.perturbation = perturbation # SPSA specific arguments self.blocking = blocking self.allowed_increase = allowed_increase self.last_avg = last_avg self.resamplings = resamplings self.perturbation_dims = perturbation_dims # 2-SPSA specific arguments if regularization is None: regularization = 0.01 self.second_order = second_order self.hessian_delay = hessian_delay self.lse_solver = lse_solver self.regularization = regularization self.initial_hessian = initial_hessian # runtime arguments self._nfev: int | None = None # the number of function evaluations self._smoothed_hessian: np.ndarray | None = None # smoothed average of the Hessians # pylint: disable=too-many-positional-arguments
[docs] @staticmethod def calibrate( loss: Callable[[np.ndarray], float], initial_point: np.ndarray, c: float = 0.2, stability_constant: float = 0, target_magnitude: float | None = None, # 2 pi / 10 alpha: float = 0.602, gamma: float = 0.101, modelspace: bool = False, max_evals_grouped: int = 1, ) -> tuple[Callable, Callable]: r"""Calibrate SPSA parameters with a power series as learning rate and perturbation coeffs. The power series are: .. math:: a_k = \frac{a}{(A + k + 1)^\alpha}, c_k = \frac{c}{(k + 1)^\gamma} Args: loss: The loss function. initial_point: The initial guess of the iteration. c: The initial perturbation magnitude. stability_constant: The value of `A`. target_magnitude: The target magnitude for the first update step, defaults to :math:`2\pi / 10`. alpha: The exponent of the learning rate power series. gamma: The exponent of the perturbation power series. modelspace: Whether the target magnitude is the difference of parameter values or function values (= model space). max_evals_grouped: The number of grouped evaluations supported by the loss function. Defaults to 1, i.e. no grouping. Returns: tuple(generator, generator): A tuple of power series generators, the first one for the learning rate and the second one for the perturbation. """ logger.info("SPSA: Starting calibration of learning rate and perturbation.") if target_magnitude is None: target_magnitude = 2 * np.pi / 10 dim = len(initial_point) # compute the average magnitude of the first step steps = 25 points = [] for _ in range(steps): # compute the random direction pert = bernoulli_perturbation(dim) points += [initial_point + c * pert, initial_point - c * pert] losses = _batch_evaluate(loss, points, max_evals_grouped) avg_magnitudes = 0.0 for i in range(steps): delta = losses[2 * i] - losses[2 * i + 1] avg_magnitudes += np.abs(delta / (2 * c)) avg_magnitudes /= steps if modelspace: a = target_magnitude / (avg_magnitudes**2) else: a = target_magnitude / avg_magnitudes # compute the rescaling factor for correct first learning rate if a < 1e-10: warnings.warn(f"Calibration failed, using {target_magnitude} for `a`") a = target_magnitude logger.info("Finished calibration:") logger.info( " -- Learning rate: a / ((A + n) ^ alpha) with a = %s, A = %s, alpha = %s", a, stability_constant, alpha, ) logger.info(" -- Perturbation: c / (n ^ gamma) with c = %s, gamma = %s", c, gamma) # set up the power series def learning_rate(): return powerseries(a, alpha, stability_constant) def perturbation(): return powerseries(c, gamma) return learning_rate, perturbation
[docs] @staticmethod def estimate_stddev( loss: Callable[[np.ndarray], float], initial_point: np.ndarray, avg: int = 25, max_evals_grouped: int = 1, ) -> float: """Estimate the standard deviation of the loss function.""" losses = _batch_evaluate(loss, avg * [initial_point], max_evals_grouped) return np.std(losses)
@property def settings(self) -> dict[str, Any]: # if learning rate or perturbation are custom iterators expand them if callable(self.learning_rate): iterator = self.learning_rate() learning_rate = np.array([next(iterator) for _ in range(self.maxiter)]) else: learning_rate = self.learning_rate # type: ignore[assignment] if callable(self.perturbation): iterator = self.perturbation() perturbation = np.array([next(iterator) for _ in range(self.maxiter)]) else: perturbation = self.perturbation # type: ignore[assignment] return { "maxiter": self.maxiter, "learning_rate": learning_rate, "perturbation": perturbation, "trust_region": self.trust_region, "blocking": self.blocking, "allowed_increase": self.allowed_increase, "resamplings": self.resamplings, "perturbation_dims": self.perturbation_dims, "second_order": self.second_order, "hessian_delay": self.hessian_delay, "regularization": self.regularization, "lse_solver": self.lse_solver, "initial_hessian": self.initial_hessian, "callback": self.callback, "termination_checker": self.termination_checker, } # pylint: disable=too-many-positional-arguments def _point_sample(self, loss, x, eps, delta1, delta2): """A single sample of the gradient at position ``x`` in direction ``delta``.""" # points to evaluate points = [x + eps * delta1, x - eps * delta1] self._nfev += 2 if self.second_order: points += [x + eps * (delta1 + delta2), x + eps * (-delta1 + delta2)] self._nfev += 2 # batch evaluate the points (if possible) values = _batch_evaluate(loss, points, self._max_evals_grouped) plus = values[0] minus = values[1] gradient_sample = (plus - minus) / (2 * eps) * delta1 hessian_sample = None if self.second_order: diff = (values[2] - plus) - (values[3] - minus) diff /= 2 * eps**2 rank_one = np.outer(delta1, delta2) hessian_sample = diff * (rank_one + rank_one.T) / 2 return np.mean(values), gradient_sample, hessian_sample def _point_estimate(self, loss, x, eps, num_samples): """The gradient estimate at point x.""" # set up variables to store averages value_estimate = 0 gradient_estimate = np.zeros(x.size) hessian_estimate = np.zeros((x.size, x.size)) # iterate over the directions deltas1 = [ bernoulli_perturbation(x.size, self.perturbation_dims) for _ in range(num_samples) ] if self.second_order: deltas2 = [ bernoulli_perturbation(x.size, self.perturbation_dims) for _ in range(num_samples) ] else: deltas2 = None for i in range(num_samples): delta1 = deltas1[i] delta2 = deltas2[i] if self.second_order else None value_sample, gradient_sample, hessian_sample = self._point_sample( loss, x, eps, delta1, delta2 ) value_estimate += value_sample gradient_estimate += gradient_sample if self.second_order: hessian_estimate += hessian_sample return ( value_estimate / num_samples, gradient_estimate / num_samples, hessian_estimate / num_samples, ) # pylint: disable=too-many-positional-arguments def _compute_update(self, loss, x, k, eps, lse_solver): # compute the perturbations if isinstance(self.resamplings, dict): num_samples = self.resamplings.get(k, 1) else: num_samples = self.resamplings # accumulate the number of samples value, gradient, hessian = self._point_estimate(loss, x, eps, num_samples) # precondition gradient with inverse Hessian, if specified if self.second_order: smoothed = k / (k + 1) * self._smoothed_hessian + 1 / (k + 1) * hessian self._smoothed_hessian = smoothed if k > self.hessian_delay: spd_hessian = _make_spd(smoothed, self.regularization) # solve for the gradient update gradient = np.real(lse_solver(spd_hessian, gradient)) return value, gradient
[docs] def minimize( self, fun: Callable[[POINT], float], x0: POINT, jac: Callable[[POINT], POINT] | None = None, bounds: list[tuple[float, float]] | None = None, ) -> OptimizerResult: # ensure learning rate and perturbation are correctly set: either none or both # this happens only here because for the calibration the loss function is required x0 = np.asarray(x0) if self.learning_rate is None and self.perturbation is None: get_eta, get_eps = self.calibrate(fun, x0, max_evals_grouped=self._max_evals_grouped) else: get_eta, get_eps = _validate_pert_and_learningrate( self.perturbation, self.learning_rate ) eta, eps = get_eta(), get_eps() lse_solver = self.lse_solver if self.lse_solver is None: lse_solver = np.linalg.solve # prepare some initials x = np.asarray(x0) if self.initial_hessian is None: self._smoothed_hessian = np.identity(x.size) else: self._smoothed_hessian = self.initial_hessian self._nfev = 0 # if blocking is enabled we need to keep track of the function values if self.blocking: fx = fun(x) # pylint: disable=invalid-name self._nfev += 1 if self.allowed_increase is None: self.allowed_increase = 2 * self.estimate_stddev( fun, x, max_evals_grouped=self._max_evals_grouped ) logger.info("SPSA: Starting optimization.") start = time() # keep track of the last few steps to return their average last_steps = deque([x]) # use a local variable and while loop to keep track of the number of iterations # if the termination checker terminates early k = 0 while k < self.maxiter: k += 1 iteration_start = time() # compute update fx_estimate, update = self._compute_update(fun, x, k, next(eps), lse_solver) # trust region if self.trust_region: norm = np.linalg.norm(update) if norm > 1: # stop from dividing by 0 update = update / norm # compute next parameter value update = update * next(eta) x_next = x - update fx_next = None # blocking if self.blocking: self._nfev += 1 fx_next = fun(x_next) if fx + self.allowed_increase <= fx_next: # accept only if loss improved if self.callback is not None: self.callback( self._nfev, # number of function evals x_next, # next parameters fx_next, # loss at next parameters np.linalg.norm(update), # size of the update step False, ) # not accepted logger.info( "Iteration %s/%s rejected in %s.", k, self.maxiter + 1, time() - iteration_start, ) continue fx = fx_next # pylint: disable=invalid-name logger.info( "Iteration %s/%s done in %s.", k, self.maxiter + 1, time() - iteration_start ) if self.callback is not None: # if we didn't evaluate the function yet, do it now if not self.blocking: self._nfev += 1 fx_next = fun(x_next) self.callback( self._nfev, # number of function evals x_next, # next parameters fx_next, # loss at next parameters np.linalg.norm(update), # size of the update step True, ) # accepted # update parameters x = x_next # update the list of the last ``last_avg`` parameters if self.last_avg > 1: last_steps.append(x_next) if len(last_steps) > self.last_avg: last_steps.popleft() if self.termination_checker is not None: fx_check = fx_estimate if fx_next is None else fx_next if self.termination_checker( self._nfev, x_next, fx_check, np.linalg.norm(update), True ): logger.info("terminated optimization at {k}/{self.maxiter} iterations") break logger.info("SPSA: Finished in %s", time() - start) if self.last_avg > 1: x = np.mean(np.asarray(last_steps), axis=0) result = OptimizerResult() result.x = x result.fun = fun(x) result.nfev = self._nfev result.nit = k return result
[docs] def get_support_level(self): """Get the support level dictionary.""" return { "gradient": OptimizerSupportLevel.ignored, "bounds": OptimizerSupportLevel.ignored, "initial_point": OptimizerSupportLevel.required, }
def bernoulli_perturbation(dim, perturbation_dims=None): """Get a Bernoulli random perturbation.""" if perturbation_dims is None: return 1 - 2 * algorithm_globals.random.binomial(1, 0.5, size=dim) pert = 1 - 2 * algorithm_globals.random.binomial(1, 0.5, size=perturbation_dims) indices = algorithm_globals.random.choice( list(range(dim)), size=perturbation_dims, replace=False ) result = np.zeros(dim) result[indices] = pert return result def powerseries(eta=0.01, power=2, offset=0): """Yield a series decreasing by a power law.""" n = 1 while True: yield eta / ((n + offset) ** power) n += 1 def constant(eta=0.01): """Yield a constant series.""" while True: yield eta def _batch_evaluate(function, points, max_evals_grouped, unpack_points=False): """Evaluate a function on all points with batches of max_evals_grouped. The points are a list of inputs, as ``[in1, in2, in3, ...]``. If the individual inputs are tuples (because the function takes multiple inputs), set ``unpack_points`` to ``True``. """ # if the function cannot handle lists of points as input, cover this case immediately if max_evals_grouped is None or max_evals_grouped == 1: # support functions with multiple arguments where the points are given in a tuple return [ function(*point) if isinstance(point, tuple) else function(point) for point in points ] num_points = len(points) # get the number of batches num_batches = num_points // max_evals_grouped if num_points % max_evals_grouped != 0: num_batches += 1 # split the points batched_points = np.array_split(np.asarray(points), num_batches) results = [] for batch in batched_points: if unpack_points: batch = _repack_points(batch) results += _as_list(function(*batch)) else: results += _as_list(function(batch)) return results def _as_list(obj): """Convert a list or numpy array into a list.""" return obj.tolist() if isinstance(obj, np.ndarray) else obj def _repack_points(points): """Turn a list of tuples of points into a tuple of lists of points. E.g. turns [(a1, a2, a3), (b1, b2, b3)] into ([a1, b1], [a2, b2], [a3, b3]) where all elements are np.ndarray. """ num_sets = len(points[0]) # length of (a1, a2, a3) return ([x[i] for x in points] for i in range(num_sets)) def _make_spd(matrix, bias=0.01): identity = np.identity(matrix.shape[0]) psd = scipy.linalg.sqrtm(matrix.dot(matrix)) return psd + bias * identity def _validate_pert_and_learningrate(perturbation, learning_rate): if learning_rate is None or perturbation is None: raise ValueError("If one of learning rate or perturbation is set, both must be set.") if isinstance(perturbation, float): def get_eps(): return constant(perturbation) elif isinstance(perturbation, (list, np.ndarray)): def get_eps(): return iter(perturbation) else: get_eps = perturbation if isinstance(learning_rate, float): def get_eta(): return constant(learning_rate) elif isinstance(learning_rate, (list, np.ndarray)): def get_eta(): return iter(learning_rate) else: get_eta = learning_rate return get_eta, get_eps