Source code for qiskit_aqt_provider.aqt_job

# This code is part of Qiskit.
#
# (C) Copyright IBM 2019, Alpine Quantum Technologies GmbH 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

import uuid
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
from types import TracebackType
from typing import (
    TYPE_CHECKING,
    Any,
    ClassVar,
    NoReturn,
    Optional,
    Union,
)

import numpy as np
from qiskit import QuantumCircuit
from qiskit.providers import JobV1
from qiskit.providers.jobstatus import JobStatus
from qiskit.result.result import Result
from qiskit.utils.lazy_tester import contextlib
from tqdm import tqdm
from typing_extensions import Self, TypeAlias, assert_never

from qiskit_aqt_provider import api_models_generated, persistence
from qiskit_aqt_provider.api_models_direct import JobResultError
from qiskit_aqt_provider.aqt_options import AQTOptions
from qiskit_aqt_provider.circuit_to_aqt import circuits_to_aqt_job

if TYPE_CHECKING:  # pragma: no cover
    from qiskit_aqt_provider.aqt_resource import AQTDirectAccessResource, AQTResource


# Tags for the status of AQT API jobs


@dataclass
class JobFinished:
    """The job finished successfully."""

    status: ClassVar = JobStatus.DONE
    results: dict[int, list[list[int]]]


@dataclass
class JobFailed:
    """An error occurred during the job execution."""

    status: ClassVar = JobStatus.ERROR
    error: str


class JobQueued:
    """The job is queued."""

    status: ClassVar = JobStatus.QUEUED


@dataclass
class JobOngoing:
    """The job is running."""

    status: ClassVar = JobStatus.RUNNING
    finished_count: int


class JobCancelled:
    """The job was cancelled."""

    status = ClassVar = JobStatus.CANCELLED


JobStatusPayload: TypeAlias = Union[JobQueued, JobOngoing, JobFinished, JobFailed, JobCancelled]


[docs] @dataclass(frozen=True) class Progress: """Progress information of a job.""" finished_count: int """Number of completed circuits.""" total_count: int """Total number of circuits in the job."""
@dataclass class _MockProgressBar: """Minimal tqdm-compatible progress bar mock.""" total: int """Total number of items in the job.""" n: int = 0 """Number of processed items.""" def update(self, n: int = 1) -> None: """Update the number of processed items by `n`.""" self.n += n def __enter__(self) -> Self: return self def __exit__( self, exc_type: Optional[type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], /, ) -> None: ...
[docs] class AQTJob(JobV1): """Handle for quantum circuits jobs running on AQT cloud backends. Jobs contain one or more quantum circuits that are executed with a common set of options (see :class:`AQTOptions <qiskit_aqt_provider.aqt_options.AQTOptions>`). Job handles should be retrieved from calls to :meth:`AQTResource.run <qiskit_aqt_provider.aqt_resource.AQTResource.run>`, which immediately returns after submitting the job. The :meth:`result` method allows blocking until a job completes: >>> import qiskit >>> from qiskit.providers import JobStatus >>> from qiskit_aqt_provider import AQTProvider >>> >>> backend = AQTProvider("").get_backend("offline_simulator_no_noise") >>> >>> qc = qiskit.QuantumCircuit(1) >>> _ = qc.rx(3.14, 0) >>> _ = qc.measure_all() >>> qc = qiskit.transpile(qc, backend) >>> >>> job = backend.run(qc, shots=100) >>> result = job.result() >>> job.status() is JobStatus.DONE True >>> result.success True >>> result.get_counts() {'1': 100} """ _backend: "AQTResource" def __init__( self, backend: "AQTResource", circuits: list[QuantumCircuit], options: AQTOptions, ) -> None: """Initialize an :class:`AQTJob` instance. .. tip:: :class:`AQTJob` instances should not be created directly. Use :meth:`AQTResource.run <qiskit_aqt_provider.aqt_resource.AQTResource.run>` to submit circuits for execution and retrieve a job handle. Args: backend: backend to run the job on. circuits: list of circuits to execute. options: overridden resource options for this job. """ super().__init__(backend, "") self.circuits = circuits self.options = options self.api_submit_payload = circuits_to_aqt_job(circuits, options.shots) self.status_payload: JobStatusPayload = JobQueued()
[docs] @classmethod def restore( cls, job_id: str, *, access_token: Optional[str] = None, store_path: Optional[Path] = None, remove_from_store: bool = True, ) -> Self: """Restore a job handle from local persistent storage. .. warning:: The default local storage path depends on the `qiskit_aqt_provider` package version. Job persisted with a different package version will therefore **not** be found! .. hint:: If the job's execution backend is an offline simulator, the job is re-submitted to the simulation backend and the new job ID differs from the one passed to this function. Args: job_id: identifier of the job to retrieve. access_token: access token for the AQT cloud. See :class:`AQTProvider <qiskit_aqt_provider.aqt_provider.AQTProvider>`. store_path: local persistent storage directory. By default, use a standard cache directory. remove_from_store: if :data:`True`, remove the retrieved job's data from persistent storage after a successful load. Returns: A job handle for the passed `job_id`. Raises: JobNotFoundError: the target job was not found in persistent storage. """ from qiskit_aqt_provider.aqt_provider import AQTProvider from qiskit_aqt_provider.aqt_resource import AQTResource, OfflineSimulatorResource store_path = persistence.get_store_path(store_path) data = persistence.Job.restore(job_id, store_path) # TODO: forward .env loading args? provider = AQTProvider(access_token) if data.resource.resource_type == "offline_simulator": # FIXME: persist with_noise_model and restore it resource = OfflineSimulatorResource(provider, data.resource, with_noise_model=False) else: resource = AQTResource(provider, data.resource) obj = cls(backend=resource, circuits=data.circuits.circuits, options=data.options) if data.resource.resource_type == "offline_simulator": # re-submit the job because we can't restore the backend state obj.submit() else: obj._job_id = job_id if remove_from_store: persistence.Job.remove_from_store(job_id, store_path) return obj
[docs] def persist(self, *, store_path: Optional[Path] = None) -> Path: """Save this job to local persistent storage. .. warning:: Only jobs that have been submitted for execution can be persisted (a valid `job_id` is required). Args: store_path: local persistent storage directory. By default, use a standard cache directory. Returns: The path to the job data in local persistent storage. Raises: RuntimeError: the job was never submitted for execution. """ if not self.job_id(): raise RuntimeError("Can only persist submitted jobs.") store_path = persistence.get_store_path(store_path) data = persistence.Job( resource=self._backend.resource_id, circuits=persistence.Circuits(self.circuits), options=self.options, ) return data.persist(self.job_id(), store_path)
[docs] def submit(self) -> None: """Submit this job for execution. This operation is not blocking. Use :meth:`result()` to block until the job completes. Raises: RuntimeError: this job was already submitted. """ if self.job_id(): raise RuntimeError(f"Job already submitted (ID: {self.job_id()})") job_id = self._backend.submit(self) self._job_id = str(job_id)
[docs] def status(self) -> JobStatus: """Query the job's status. Returns: Aggregated job status for all the circuits in this job. """ payload = self._backend.result(uuid.UUID(self.job_id())) if isinstance(payload, api_models_generated.JobResponseRRQueued): self.status_payload = JobQueued() elif isinstance(payload, api_models_generated.JobResponseRROngoing): self.status_payload = JobOngoing(finished_count=payload.response.finished_count) elif isinstance(payload, api_models_generated.JobResponseRRFinished): self.status_payload = JobFinished( results={ int(circuit_index): [[sample.root for sample in shot] for shot in shots] for circuit_index, shots in payload.response.result.items() } ) elif isinstance(payload, api_models_generated.JobResponseRRError): self.status_payload = JobFailed(error=payload.response.message) elif isinstance(payload, api_models_generated.JobResponseRRCancelled): self.status_payload = JobCancelled() else: # pragma: no cover assert_never(payload) return self.status_payload.status
[docs] def progress(self) -> Progress: """Progress information for this job.""" num_circuits = len(self.circuits) if isinstance(self.status_payload, JobQueued): return Progress(finished_count=0, total_count=num_circuits) if isinstance(self.status_payload, JobOngoing): return Progress( finished_count=self.status_payload.finished_count, total_count=num_circuits ) # if the circuit is finished, failed, or cancelled, it is completed return Progress(finished_count=num_circuits, total_count=num_circuits)
@property def error_message(self) -> Optional[str]: """Error message for this job (if any).""" if isinstance(self.status_payload, JobFailed): return self.status_payload.error return None
[docs] def result(self) -> Result: """Block until all circuits have been evaluated and return the combined result. Success or error is signalled by the `success` field in the returned Result instance. Returns: The combined result of all circuit evaluations. """ if self.options.with_progress_bar: context: Union[tqdm[NoReturn], _MockProgressBar] = tqdm(total=len(self.circuits)) else: context = _MockProgressBar(total=len(self.circuits)) with context as progress_bar: def callback( job_id: str, # noqa: ARG001 status: JobStatus, # noqa: ARG001 job: AQTJob, ) -> None: progress = job.progress() progress_bar.update(progress.finished_count - progress_bar.n) # one of DONE, CANCELLED, ERROR self.wait_for_final_state( timeout=self.options.query_timeout_seconds, wait=self.options.query_period_seconds, callback=callback, ) # make sure the progress bar completes progress_bar.update(self.progress().finished_count - progress_bar.n) results = [] if isinstance(self.status_payload, JobFinished): for circuit_index, circuit in enumerate(self.circuits): samples = self.status_payload.results[circuit_index] results.append( _partial_qiskit_result_dict( samples, circuit, shots=self.options.shots, memory=self.options.memory ) ) return Result.from_dict( { "backend_name": self._backend.name, "backend_version": self._backend.version, "qobj_id": id(self.circuits), "job_id": self.job_id(), "success": self.status_payload.status is JobStatus.DONE, "results": results, # Pass error message as metadata "error": self.error_message, } )
[docs] class AQTDirectAccessJob(JobV1): """Handle for quantum circuits jobs running on direct-access AQT backends. Use :meth:`AQTDirectAccessResource.run <qiskit_aqt_provider.aqt_resource.AQTDirectAccessResource.run>` to get a handle and evaluate circuits on a direct-access backend. """ _backend: "AQTDirectAccessResource" def __init__( self, backend: "AQTDirectAccessResource", circuits: list[QuantumCircuit], options: AQTOptions, ) -> None: """Initialize the :class:`AQTDirectAccessJob` instance. Args: backend: backend to run the job on. circuits: list of circuits to execute. options: overridden resource options for this job. """ super().__init__(backend, "") self.circuits = circuits self.options = options self.api_submit_payload = circuits_to_aqt_job(circuits, options.shots) self._job_id = uuid.uuid4() self._status = JobStatus.INITIALIZING def submit(self) -> None: """No-op on direct-access backends."""
[docs] def result(self) -> Result: """Iteratively submit all circuits and block until full completion. If an error occurs, the remaining circuits are not executed and the whole job is marked as failed. Returns: The combined result of all circuit evaluations. """ if self.options.with_progress_bar: context: Union[tqdm[NoReturn], _MockProgressBar] = tqdm(total=len(self.circuits)) else: context = _MockProgressBar(total=len(self.circuits)) result = { "backend_name": self._backend.name, "backend_version": self._backend.version, "qobj_id": id(self.circuits), "job_id": self.job_id(), "success": True, "results": [], } with context as progress_bar: for circuit_index, circuit in enumerate(self.circuits): api_circuit = self.api_submit_payload.payload.circuits[circuit_index] job_id = self._backend.submit(api_circuit) api_result = self._backend.result( job_id, timeout=self.options.query_timeout_seconds ) if isinstance(api_result.payload, JobResultError): break result["results"].append( _partial_qiskit_result_dict( api_result.payload.result, circuit, shots=self.options.shots, memory=self.options.memory, ) ) progress_bar.update(1) else: # no circuits in the job, or all executed successfully self._status = JobStatus.DONE return Result.from_dict(result) self._status = JobStatus.ERROR result["success"] = False return Result.from_dict(result)
[docs] def status(self) -> JobStatus: """Query the job's status. Returns: Aggregated job status for all the circuits in this job. """ return self._status
def _partial_qiskit_result_dict( samples: list[list[int]], circuit: QuantumCircuit, *, shots: int, memory: bool ) -> dict[str, Any]: """Build the Qiskit result dict for a single circuit evaluation. Args: samples: measurement outcome of the circuit evaluation. circuit: the evaluated circuit. shots: number of repetitions of the circuit evaluation. memory: whether to fill the classical memory dump field with the measurement results. Returns: Dict, suitable for Qiskit's `Result.from_dict` factory. """ meas_map = _build_memory_mapping(circuit) data: dict[str, Any] = {"counts": _format_counts(samples, meas_map)} if memory: data["memory"] = ["".join(str(x) for x in reversed(states)) for states in samples] return { "shots": shots, "success": True, "status": JobStatus.DONE, "data": data, "header": { "memory_slots": circuit.num_clbits, "creg_sizes": [[reg.name, reg.size] for reg in circuit.cregs], "qreg_sizes": [[reg.name, reg.size] for reg in circuit.qregs], "name": circuit.name, "metadata": circuit.metadata or {}, }, } def _build_memory_mapping(circuit: QuantumCircuit) -> dict[int, set[int]]: """Scan the circuit for measurement instructions and collect qubit to classical bits mappings. Qubits can be mapped to multiple classical bits, possibly in different classical registers. The returned map only maps qubits referenced in a `measure` operation in the passed circuit. Qubits not targeted by a `measure` operation will not appear in the returned result. Parameters: circuit: the `QuantumCircuit` to analyze. Returns: the translation map for all measurement operations in the circuit. Examples: >>> qc = QuantumCircuit(2) >>> qc.measure_all() >>> _build_memory_mapping(qc) {0: {0}, 1: {1}} >>> qc = QuantumCircuit(2, 2) >>> _ = qc.measure([0, 1], [1, 0]) >>> _build_memory_mapping(qc) {0: {1}, 1: {0}} >>> qc = QuantumCircuit(3, 2) >>> _ = qc.measure([0, 1], [0, 1]) >>> _build_memory_mapping(qc) {0: {0}, 1: {1}} >>> qc = QuantumCircuit(4, 6) >>> _ = qc.measure([0, 1, 2, 3], [2, 3, 4, 5]) >>> _build_memory_mapping(qc) {0: {2}, 1: {3}, 2: {4}, 3: {5}} >>> qc = QuantumCircuit(3, 4) >>> qc.measure_all(add_bits=False) >>> _build_memory_mapping(qc) {0: {0}, 1: {1}, 2: {2}} >>> qc = QuantumCircuit(3, 3) >>> _ = qc.x(0) >>> _ = qc.measure([0], [2]) >>> _ = qc.y(1) >>> _ = qc.measure([1], [1]) >>> _ = qc.x(2) >>> _ = qc.measure([2], [0]) >>> _build_memory_mapping(qc) {0: {2}, 1: {1}, 2: {0}} 5 qubits in two registers: >>> from qiskit import QuantumRegister, ClassicalRegister >>> qr0 = QuantumRegister(2) >>> qr1 = QuantumRegister(3) >>> cr = ClassicalRegister(2) >>> qc = QuantumCircuit(qr0, qr1, cr) >>> _ = qc.measure(qr0, cr) >>> _build_memory_mapping(qc) {0: {0}, 1: {1}} Multiple mapping of a qubit: >>> qc = QuantumCircuit(3, 3) >>> _ = qc.measure([0, 1], [0, 1]) >>> _ = qc.measure([0], [2]) >>> _build_memory_mapping(qc) {0: {0, 2}, 1: {1}} """ qu2cl: defaultdict[int, set[int]] = defaultdict(set) for instruction in circuit.data: if instruction.operation.name == "measure": for qubit, clbit in zip(instruction.qubits, instruction.clbits): qu2cl[circuit.find_bit(qubit).index].add(circuit.find_bit(clbit).index) return dict(qu2cl) def _shot_to_int( fluorescence_states: list[int], qubit_to_bit: Optional[dict[int, set[int]]] = None ) -> int: """Format the detected fluorescence states from a single shot as an integer. This follows the Qiskit ordering convention, where bit 0 in the classical register is mapped to bit 0 in the returned integer. The first classical register in the original circuit represents the least-significant bits in the integer representation. An optional translation map from the quantum to the classical register can be applied. If given, only the qubits registered in the translation map are present in the return value, at the index given by the translation map. Parameters: fluorescence_states: detected fluorescence states for this shot qubit_to_bit: optional translation map from quantum register to classical register positions Returns: integral representation of the shot result, with the translation map applied. Examples: Without a translation map, the natural mapping is used (n -> n): >>> _shot_to_int([1]) 1 >>> _shot_to_int([0, 0, 1]) 4 >>> _shot_to_int([0, 1, 1]) 6 Swap qubits 1 and 2 in the classical register: >>> _shot_to_int([1, 0, 1], {0: {0}, 1: {2}, 2: {1}}) 3 If the map is partial, only the mapped qubits are present in the output: >>> _shot_to_int([1, 0, 1], {1: {2}, 2: {1}}) 2 One can translate into a classical register larger than the qubit register. Warning: the classical register is always initialized to 0. >>> _shot_to_int([1], {0: {1}}) 2 >>> _shot_to_int([0, 1, 1], {0: {3}, 1: {4}, 2: {5}}) == (0b110 << 3) True or with a map larger than the qubit space: >>> _shot_to_int([1], {0: {0}, 1: {1}}) 1 Consider the typical example of two quantum registers (the second one contains ancilla qubits) and one classical register: >>> from qiskit import QuantumRegister, ClassicalRegister >>> qr_meas = QuantumRegister(2) >>> qr_ancilla = QuantumRegister(3) >>> cr = ClassicalRegister(2) >>> qc = QuantumCircuit(qr_meas, qr_ancilla, cr) >>> _ = qc.measure(qr_meas, cr) >>> tr_map = _build_memory_mapping(qc) We assume that a single shot gave the result: >>> ancillas = [1, 1, 0] >>> meas = [1, 0] Then the corresponding output is 0b01 (measurement qubits mapped straight to the classical register of length 2): >>> _shot_to_int(meas + ancillas, tr_map) == 0b01 True One can overwrite qr_meas[1] with qr_ancilla[0]: >>> _ = qc.measure(qr_ancilla[0], cr[1]) >>> tr_map = _build_memory_mapping(qc) >>> _shot_to_int(meas + ancillas, tr_map) == 0b11 True """ tr_map = qubit_to_bit or {} if tr_map: # allocate a zero-initialized classical register # TODO: support pre-initialized classical registers clbits = max(max(d) for d in tr_map.values()) + 1 creg = [0] * clbits for src_index, dest_indices in tr_map.items(): # the translation map could map more than just the measured qubits with contextlib.suppress(IndexError): for dest_index in dest_indices: creg[dest_index] = fluorescence_states[src_index] else: creg = fluorescence_states.copy() return int((np.left_shift(1, np.arange(len(creg))) * creg).sum()) def _format_counts( samples: list[list[int]], qubit_to_bit: Optional[dict[int, set[int]]] = None ) -> dict[str, int]: """Format all shots results from a circuit evaluation. The returned dictionary is compatible with Qiskit's `ExperimentResultData` `counts` field. Keys are hexadecimal string representations of the detected states, with the optional `QuantumRegister` to `ClassicalRegister` applied. Values are the occurrences of the keys. Parameters: samples: detected qubit fluorescence states for all shots qubit_to_bit: optional quantum to classical register translation map Returns: collected counts, for `ExperimentResultData`. Examples: >>> _format_counts([[1, 0, 0], [0, 1, 0], [1, 0, 0]]) {'0x1': 2, '0x2': 1} >>> _format_counts([[1, 0, 0], [0, 1, 0], [1, 0, 0]], {0: {2}, 1: {1}, 2: {0}}) {'0x4': 2, '0x2': 1} """ return dict(Counter(hex(_shot_to_int(shot, qubit_to_bit)) for shot in samples))