Source code for qiskit_algorithms.gradients.spsa.spsa_sampler_gradient

# This code is part of a Qiskit project.
#
# (C) Copyright IBM 2022, 2025.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Gradient of Sampler with Finite difference method."""

from __future__ import annotations

from collections import defaultdict
from collections.abc import Sequence
from typing import Any

import numpy as np

from qiskit.circuit import Parameter, QuantumCircuit
from qiskit.primitives import BaseSamplerV2

from ..base.base_sampler_gradient import BaseSamplerGradient
from ..base.sampler_gradient_result import SamplerGradientResult
from ...custom_types import Transpiler

from ...exceptions import AlgorithmError


[docs] class SPSASamplerGradient(BaseSamplerGradient): """ Compute the gradients of the sampling probability by the Simultaneous Perturbation Stochastic Approximation (SPSA) [1]. **Reference:** [1] J. C. Spall, Adaptive stochastic approximation by the simultaneous perturbation method in IEEE Transactions on Automatic Control, vol. 45, no. 10, pp. 1839-1853, Oct 2020, `doi: 10.1109/TAC.2000.880982 <https://ieeexplore.ieee.org/document/880982>`_. """ # pylint: disable=too-many-positional-arguments def __init__( self, sampler: BaseSamplerV2, epsilon: float, batch_size: int = 1, seed: int | None = None, shots: int | None = None, *, transpiler: Transpiler | None = None, transpiler_options: dict[str, Any] | None = None, ): """ Args: sampler: The sampler used to compute the gradients. epsilon: The offset size for the SPSA gradients. batch_size: number of gradients to average. seed: The seed for a random perturbation vector. shots: Number of shots to be used by the underlying sampler. The order of priority is: number of shots in ``run`` method > fidelity's number of shots > primitive's default number of shots. Higher priority setting overrides lower priority setting. transpiler: An optional object with a `run` method allowing to transpile the circuits that are run when using this algorithm. If set to `None`, these won't be transpiled. transpiler_options: A dictionary of options to be passed to the transpiler's `run` method as keyword arguments. Raises: ValueError: If ``epsilon`` is not positive. """ if epsilon <= 0: raise ValueError(f"epsilon ({epsilon}) should be positive.") self._batch_size = batch_size self._epsilon = epsilon self._seed = np.random.default_rng(seed) super().__init__( sampler, shots, transpiler=transpiler, transpiler_options=transpiler_options ) def _run( self, circuits: Sequence[QuantumCircuit], parameter_values: Sequence[Sequence[float]], parameters: Sequence[Sequence[Parameter]], *, shots: int | Sequence[int] | None = None, ) -> SamplerGradientResult: """Compute the sampler gradients on the given circuits.""" metadata, offsets = [], [] all_n = [] has_transformed_shots = False if isinstance(shots, int) or shots is None: shots = [shots] * len(circuits) has_transformed_shots = True pubs = [] if self._transpiler is not None: circuits = self._transpiler.run(circuits, **self._transpiler_options) for circuit, parameter_values_, parameters_, shots_ in zip( circuits, parameter_values, parameters, shots ): # Indices of parameters to be differentiated. indices = [circuit.parameters.data.index(p) for p in parameters_] metadata.append({"parameters": parameters_}) offset = np.array( [ (-1) ** (self._seed.integers(0, 2, len(circuit.parameters))) for _ in range(self._batch_size) ] ) plus = [parameter_values_ + self._epsilon * offset_ for offset_ in offset] minus = [parameter_values_ - self._epsilon * offset_ for offset_ in offset] offsets.append(offset) # Combine inputs into a single job to reduce overhead. n = 2 * self._batch_size all_n.append(n) pubs.append((circuit, plus + minus, shots_)) # Run the single job with all circuits. job = self._sampler.run(pubs) try: results = job.result() except Exception as exc: raise AlgorithmError("Sampler job failed.") from exc # Compute the gradients. gradients = [] partial_sum_n = 0 for i, (n, result_n) in enumerate(zip(all_n, results)): dist_diffs = {} result = [ {label: value / res.num_shots for label, value in res.get_int_counts().items()} for res in getattr(result_n.data, next(iter(result_n.data))) ] for j, (dist_plus, dist_minus) in enumerate(zip(result[: n // 2], result[n // 2 :])): dist_diff: dict[int, float] = defaultdict(float) for key, value in dist_plus.items(): dist_diff[key] += value / (2 * self._epsilon) for key, value in dist_minus.items(): dist_diff[key] -= value / (2 * self._epsilon) dist_diffs[j] = dist_diff gradient = [] indices = [circuits[i].parameters.data.index(p) for p in metadata[i]["parameters"]] for j in indices: gradient_j: dict[int, float] = defaultdict(float) for k in range(self._batch_size): for key, value in dist_diffs[k].items(): gradient_j[key] += value * offsets[i][k][j] gradient_j = {key: value / self._batch_size for key, value in gradient_j.items()} gradient.append(gradient_j) gradients.append(gradient) partial_sum_n += n if has_transformed_shots: shots = shots[0] if shots is None: shots = results[0].metadata["shots"] else: for i, (shots_, result) in enumerate(zip(shots, results)): if shots_ is None: shots[i] = result.metadata["shots"] return SamplerGradientResult(gradients=gradients, metadata=metadata, shots=shots)