Source code for qiskit_machine_learning.kernels.fidelity_quantum_kernel

# This code is part of a Qiskit project.
#
# (C) Copyright IBM 2022, 2024.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Fidelity Quantum Kernel"""

from __future__ import annotations

from collections.abc import Sequence
from typing import List, Tuple

import numpy as np
from qiskit import QuantumCircuit
from qiskit.primitives import Sampler
from ..state_fidelities import BaseStateFidelity, ComputeUncompute

from .base_kernel import BaseKernel

KernelIndices = List[Tuple[int, int]]


[docs] class FidelityQuantumKernel(BaseKernel): r""" An implementation of the quantum kernel interface based on the :class:`~qiskit_machine_learning.state_fidelities.BaseStateFidelity` algorithm. Here, the kernel function is defined as the overlap of two quantum states defined by a parametrized quantum circuit (called feature map): .. math:: K(x,y) = |\langle \phi(x) | \phi(y) \rangle|^2 """ def __init__( self, *, feature_map: QuantumCircuit | None = None, fidelity: BaseStateFidelity | None = None, enforce_psd: bool = True, evaluate_duplicates: str = "off_diagonal", max_circuits_per_job: int = None, ) -> None: """ Args: feature_map: Parameterized circuit to be used as the feature map. If ``None`` is given, :class:`~qiskit.circuit.library.ZZFeatureMap` is used with two qubits. If there's a mismatch in the number of qubits of the feature map and the number of features in the dataset, then the kernel will try to adjust the feature map to reflect the number of features. fidelity: An instance of the :class:`~qiskit_machine_learning.state_fidelities.BaseStateFidelity` primitive to be used to compute fidelity between states. Default is :class:`~qiskit_machine_learning.state_fidelities.ComputeUncompute` which is created on top of the reference sampler defined by :class:`~qiskit.primitives.Sampler`. enforce_psd: Project to the closest positive semidefinite matrix if ``x = y``. Default ``True``. evaluate_duplicates: Defines a strategy how kernel matrix elements are evaluated if duplicate samples are found. Possible values are: - ``all`` means that all kernel matrix elements are evaluated, even the diagonal ones when training. This may introduce additional noise in the matrix. - ``off_diagonal`` when training the matrix diagonal is set to `1`, the rest elements are fully evaluated, e.g., for two identical samples in the dataset. When inferring, all elements are evaluated. This is the default value. - ``none`` when training the diagonal is set to `1` and if two identical samples are found in the dataset the corresponding matrix element is set to `1`. When inferring, matrix elements for identical samples are set to `1`. max_circuits_per_job: Maximum number of circuits per job for the backend. Please check the backend specifications. Use ``None`` for all entries per job. Default ``None``. Raises: ValueError: When unsupported value is passed to `evaluate_duplicates`. """ super().__init__(feature_map=feature_map, enforce_psd=enforce_psd) eval_duplicates = evaluate_duplicates.lower() if eval_duplicates not in ("all", "off_diagonal", "none"): raise ValueError( f"Unsupported value passed as evaluate_duplicates: {evaluate_duplicates}" ) self._evaluate_duplicates = eval_duplicates if fidelity is None: fidelity = ComputeUncompute(sampler=Sampler()) self._fidelity = fidelity if max_circuits_per_job is not None: if max_circuits_per_job < 1: raise ValueError( f"Unsupported value passed as max_circuits_per_job: {max_circuits_per_job}" ) self.max_circuits_per_job = max_circuits_per_job
[docs] def evaluate(self, x_vec: np.ndarray, y_vec: np.ndarray | None = None) -> np.ndarray: x_vec, y_vec = self._validate_input(x_vec, y_vec) # determine if calculating self inner product is_symmetric = True if y_vec is None: y_vec = x_vec elif not np.array_equal(x_vec, y_vec): is_symmetric = False kernel_shape = (x_vec.shape[0], y_vec.shape[0]) if is_symmetric: left_parameters, right_parameters, indices = self._get_symmetric_parameterization(x_vec) kernel_matrix = self._get_symmetric_kernel_matrix( kernel_shape, left_parameters, right_parameters, indices ) else: left_parameters, right_parameters, indices = self._get_parameterization(x_vec, y_vec) kernel_matrix = self._get_kernel_matrix( kernel_shape, left_parameters, right_parameters, indices ) if is_symmetric and self._enforce_psd: kernel_matrix = self._make_psd(kernel_matrix) return kernel_matrix
def _get_parameterization( self, x_vec: np.ndarray, y_vec: np.ndarray ) -> tuple[np.ndarray, np.ndarray, KernelIndices]: """ Combines x_vec and y_vec to get all the combinations needed to evaluate the kernel entries. """ num_features = x_vec.shape[1] left_parameters = np.zeros((0, num_features)) right_parameters = np.zeros((0, num_features)) indices = np.asarray( [ (i, j) for i, x_i in enumerate(x_vec) for j, y_j in enumerate(y_vec) if not self._is_trivial(i, j, x_i, y_j, False) ] ) if indices.size > 0: left_parameters = x_vec[indices[:, 0]] right_parameters = y_vec[indices[:, 1]] return left_parameters, right_parameters, indices.tolist() def _get_symmetric_parameterization( self, x_vec: np.ndarray ) -> tuple[np.ndarray, np.ndarray, KernelIndices]: """ Combines two copies of x_vec to get all the combinations needed to evaluate the kernel entries. """ num_features = x_vec.shape[1] left_parameters = np.zeros((0, num_features)) right_parameters = np.zeros((0, num_features)) indices = np.asarray( [ (i, i + j) for i, x_i in enumerate(x_vec) for j, x_j in enumerate(x_vec[i:]) if not self._is_trivial(i, i + j, x_i, x_j, True) ] ) if indices.size > 0: left_parameters = x_vec[indices[:, 0]] right_parameters = x_vec[indices[:, 1]] return left_parameters, right_parameters, indices.tolist() def _get_kernel_matrix( self, kernel_shape: tuple[int, int], left_parameters: np.ndarray, right_parameters: np.ndarray, indices: KernelIndices, ) -> np.ndarray: """ Given a parameterization, this computes the symmetric kernel matrix. """ kernel_entries = self._get_kernel_entries(left_parameters, right_parameters) # fill in trivial entries and then update with fidelity values kernel_matrix = np.ones(kernel_shape) for i, (col, row) in enumerate(indices): kernel_matrix[col, row] = kernel_entries[i] return kernel_matrix def _get_symmetric_kernel_matrix( self, kernel_shape: tuple[int, int], left_parameters: np.ndarray, right_parameters: np.ndarray, indices: KernelIndices, ) -> np.ndarray: """ Given a set of parameterization, this computes the kernel matrix. """ kernel_entries = self._get_kernel_entries(left_parameters, right_parameters) kernel_matrix = np.ones(kernel_shape) for i, (col, row) in enumerate(indices): kernel_matrix[col, row] = kernel_entries[i] kernel_matrix[row, col] = kernel_entries[i] return kernel_matrix def _get_kernel_entries( self, left_parameters: np.ndarray, right_parameters: np.ndarray ) -> Sequence[float]: """ Gets kernel entries by executing the underlying fidelity instance and getting the results back from the async job. """ num_circuits = left_parameters.shape[0] kernel_entries = [] # Check if it is trivial case, only identical samples if num_circuits != 0: if self.max_circuits_per_job is None: job = self._fidelity.run( [self._feature_map] * num_circuits, [self._feature_map] * num_circuits, left_parameters, # type: ignore[arg-type] right_parameters, # type: ignore[arg-type] ) kernel_entries = job.result().fidelities else: # Determine the number of chunks needed num_chunks = ( num_circuits + self.max_circuits_per_job - 1 ) // self.max_circuits_per_job for i in range(num_chunks): # Determine the range of indices for this chunk start_idx = i * self.max_circuits_per_job end_idx = min((i + 1) * self.max_circuits_per_job, num_circuits) # Extract the parameters for this chunk chunk_left_parameters = left_parameters[start_idx:end_idx] chunk_right_parameters = right_parameters[start_idx:end_idx] # Execute this chunk job = self._fidelity.run( [self._feature_map] * (end_idx - start_idx), [self._feature_map] * (end_idx - start_idx), chunk_left_parameters, # type: ignore[arg-type] chunk_right_parameters, # type: ignore[arg-type] ) # Extend the kernel_entries list with the results from this chunk kernel_entries.extend(job.result().fidelities) return kernel_entries # pylint: disable=too-many-positional-arguments def _is_trivial( self, i: int, j: int, x_i: np.ndarray, y_j: np.ndarray, symmetric: bool ) -> bool: """ Verifies if the kernel entry is trivial (to be set to `1.0`) or not. Args: i: row index of the entry in the kernel matrix. j: column index of the entry in the kernel matrix. x_i: a sample from the dataset that corresponds to the row in the kernel matrix. y_j: a sample from the dataset that corresponds to the column in the kernel matrix. symmetric: whether it is a symmetric case or not. Returns: `True` if the entry is trivial, `False` otherwise. """ # if we evaluate all combinations, then it is non-trivial if self._evaluate_duplicates == "all": return False # if we are on the diagonal and we don't evaluate it, it is trivial if symmetric and i == j and self._evaluate_duplicates == "off_diagonal": return True # if don't evaluate any duplicates if np.array_equal(x_i, y_j) and self._evaluate_duplicates == "none": return True # otherwise evaluate return False @property def fidelity(self): """Returns the fidelity primitive used by this kernel.""" return self._fidelity @property def evaluate_duplicates(self): """Returns the strategy used by this kernel to evaluate kernel matrix elements if duplicate samples are found.""" return self._evaluate_duplicates