# This code is part of Qiskit.
#
# (C) Copyright IBM 2017, 2018.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
# Copyright 2020 IonQ, Inc. (www.ionq.com)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""IonQ's Job implementation.
.. NOTE::
IonQ job status names are slightly different than those of the standard
:class:`JobStatus <qiskit.providers.JobStatus>` enum values.
As such, the :meth:`IonQJob.status` method on the IonQJob class attempts to
perform a mapping between these status values for compatibility with
:class:`BaseJob <qiskit.providers.BaseJob>`.
"""
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING, Any, Optional, Callable
import numpy as np
from qiskit import QuantumCircuit
from qiskit.providers import JobV1, jobstatus
from qiskit.providers.exceptions import JobTimeoutError
from .ionq_result import IonQResult as Result
from .helpers import decompress_metadata_string, normalize
from . import constants, exceptions
if TYPE_CHECKING: # pragma: no cover
from . import ionq_backend
from . import ionq_client
def map_output(data, clbits, num_qubits):
"""Map histogram according to measured bits."""
if not clbits:
return {}
mapped_output = {}
def get_bitvalue(bitstring, bit):
if bit is not None and 0 <= bit < len(bitstring):
return bitstring[bit]
return "0"
for value, probability in data.items():
bitstring = bin(int(value))[2:].rjust(num_qubits, "0")[::-1]
outvalue = int("".join(get_bitvalue(bitstring, bit) for bit in clbits)[::-1], 2)
mapped_output[outvalue] = mapped_output.get(outvalue, 0) + probability
return mapped_output
def _build_counts( # pylint: disable=too-many-positional-arguments
data,
num_qubits: int,
clbits: list[int],
shots: int,
use_sampler: bool = False,
sampler_seed: int | None = None,
) -> tuple[dict[str, int], dict[str, float]]:
"""Map IonQ's ``counts`` onto qiskit's ``counts`` model.
.. NOTE:: For simulator jobs, this method builds counts using a randomly
generated sampling of the probabilities returned from the API. Because
this is a random process, rebuilding the results object (by e.g.
restarting the kernel and getting the job again) without providing a
sampler_seed in the run method may result in slightly different counts.
Args:
data (dict): histogram as returned by the API.
num_qubits (int): number of qubits
clbits (List[int]): array of classical bits for measurements
shots (int): number of shots
use_sampler (bool): for counts generation, whether to use
simple shots * probabilities (for qpu) or a sampler (for simulator)
sampler_seed (int): ability to provide a seed for the randomness in the
sampler for repeatable results. passed as
`np.random.RandomState(seed)`. If none, `np.random` is used
Returns:
tuple(dict[str, float], dict[str, float]): A tuple (counts, probabilities),
respectively a dict of qiskit compatible ``counts`` and a dict of
the job's probabilities as a``Counts`` object, mostly relevant for
simulator work.
Raises:
IonQJobError: In the event that ``result`` has missing or invalid job
properties.
"""
# Short circuit when we don't have all the information we need.
if not data:
raise exceptions.IonQJobError("Cannot remap counts without data!")
# Grab the mapped output from response.
output_probs = map_output(data, clbits, num_qubits)
sampled = {}
if use_sampler:
rand = np.random.RandomState(sampler_seed)
outcomes, weights = zip(*output_probs.items())
sample_counts = np.bincount(
rand.choice(len(outcomes), shots, p=normalize(weights)),
minlength=len(outcomes),
)
sampled = dict(zip(outcomes, sample_counts))
# Build counts and probabilities
counts = {}
probabilities = {}
for key_int, prob in output_probs.items():
bitstr = bin(int(key_int))[2:].rjust(
len(clbits) if clbits else num_qubits, "0"
) # e.g. '101'
cnt = sampled.get(key_int, round(prob * shots))
if cnt: # ignore zero bins
counts[bitstr] = int(cnt)
probabilities[bitstr] = float(prob)
return counts, probabilities
[docs]
class IonQJob(JobV1):
"""Representation of a Job that will run on an IonQ backend.
It is not recommended to create Job instances directly; use
:meth:`IonQBackend.run` and :meth:`IonQBackend.retrieve_job` instead.
Attributes:
circuit(:mod:`QuantumCircuit <qiskit.QuantumCircuit>`): A possibly ``None``
Qiskit quantum circuit.
_result(:class:`Result <qiskit.result.Result>`):
The actual Qiskit Result of this job when done.
"""
def __init__(
self,
backend: ionq_backend.IonQBackend,
job_id: Optional[str] = None,
client: Optional[ionq_client.IonQClient] = None,
circuit: Optional[QuantumCircuit] = None,
passed_args: Optional[dict] = None,
): # pylint: disable=too-many-positional-arguments
assert (
job_id is not None or circuit is not None
), "Job must have a job_id or circuit"
super().__init__(
backend=backend, job_id=job_id if job_id else ""
) # TODO improve handling of None job_id
self._client = client or backend.client
self._result = None
self._status = None
self._execution_time = None
self._metadata: dict[str, Any] = {}
if passed_args is not None:
self.extra_query_params = passed_args.pop("extra_query_params", {})
self.extra_metadata = passed_args.pop("extra_metadata", {})
self._passed_args = passed_args
else:
self.extra_query_params = {}
self.extra_metadata = {}
self._passed_args = {"shots": 1024, "sampler_seed": None}
# Support single or list-of-circuits submissions
if circuit is not None:
self.circuit = circuit
self._status = jobstatus.JobStatus.INITIALIZING
else: # retrieve existing job
self.circuit = None
self._status = jobstatus.JobStatus.INITIALIZING
self._job_id = job_id
self.status()
@staticmethod
def _first_of(mapping: dict, *keys, default=None):
"""Return the first present key in `keys` or `default`."""
for k in keys:
if k in mapping and mapping[k] is not None:
return mapping[k]
return default
[docs]
def cancel(self) -> None:
"""Cancel this job."""
assert self._job_id is not None, "Cannot cancel a job without a job_id."
self._client.cancel_job(self._job_id)
[docs]
def submit(self) -> None:
"""Submit a job to the IonQ API.
Raises:
IonQJobError: If this instance's :attr:`qobj` was `None`.
"""
if self.circuit is None:
raise exceptions.IonQJobError(
"Cannot submit a job without a circuit. "
"Please create a job with a circuit and try again."
)
response = self._client.submit_job(job=self)
self._job_id = response["id"]
[docs]
def get_counts(self, circuit: Optional[QuantumCircuit] = None) -> dict:
"""Return the counts for the job.
.. ATTENTION::
Result counts for jobs processed by
:class:`IonQSimulatorBackend <qiskit_ionq.ionq_backend.IonQSimulatorBackend>`
are returned from the API as probabilities, and are converted to counts via
simple statistical sampling that occurs on the cient side.
To obtain the true probabilities, use the get_probabilties() method instead.
Args:
circuit (str or QuantumCircuit or int or None): Optional.
The index of the experiment.
Returns:
dict: A dictionary of counts.
"""
return self.result().get_counts(circuit)
[docs]
def get_probabilities(self, circuit=None): # pylint: disable=unused-argument
"""Return the probabilities (for simulators).
This is effectively a pass-through to
:meth:`get_probabilities <qiskit_ionq.ionq_result.IonQResult.get_probabilities>`
Args:
circuit (str or QuantumCircuit or int or None): Optional.
Returns:
tuple(dict[str, float], dict[str, float]): A tuple counts, probabilities.
"""
return self.result().get_probabilities()
[docs]
def result(
self,
sharpen: bool | None = None,
timeout: float | None = None,
wait: float = 5,
callback: Callable | None = None,
extra_query_params: dict | None = None,
): # pylint: disable=too-many-positional-arguments
"""Retrieve job result data, blocking until the job is complete.
.. NOTE::
:attr:`_result` is populated by :meth:`status`, when the job
status has reached a "final" state.
This method calls the
:meth:`wait_for_final_state <qiskit.providers.BaseJob.wait_for_final_state>`
method to poll for a completed job.
Raises:
IonQJobTimeoutError: If after the default wait period in
:meth:`wait_for_final_state <qiskit.providers.BaseJob.wait_for_final_state>`
elapses and the job has not reached a "final" state.
IonQJobError: If the job has reached a final state but
the job itself was never converted to a
:class:`Result <qiskit.result.Result>`.
IonQJobStateError: If the job was cancelled before this method fetches it.
Returns:
Result: A Qiskit :class:`Result <qiskit.result.Result>` representation of this job.
"""
# Validate args
if sharpen is not None and not isinstance(sharpen, bool):
warnings.warn("Invalid sharpen type")
# Wait for the job to complete.
try:
self.wait_for_final_state(timeout=timeout, wait=wait, callback=callback)
except JobTimeoutError as ex:
raise exceptions.IonQJobTimeoutError(
"Timed out waiting for job to complete."
) from ex
if self._status is jobstatus.JobStatus.CANCELLED:
assert self._job_id is not None
raise exceptions.IonQJobStateError(
f"Cannot retrieve result for canceled job {self._job_id}"
)
if self._status is jobstatus.JobStatus.DONE:
assert self._job_id is not None
response = self._client.get_results(
results_url=self._results_url,
sharpen=sharpen,
extra_query_params=extra_query_params,
)
self._result = self._format_result(response)
return self._result
[docs]
def status(self, detailed: bool = False) -> jobstatus.JobStatus | dict:
"""Retrieve the status of a job.
Args:
detailed (bool): If True, returns a detailed status of children.
Returns:
JobStatus or dict: An enum value from Qiskit's
:class:`JobStatus <qiskit.providers.JobStatus>` if detailed is False.
A dictionary containing the detailed status of the children if detailed is True.
Raises:
IonQJobError: If the IonQ job status was unknown or otherwise
unmappable to a qiskit job status.
IonQJobFailureError: If the job fails
IonQJobStateError: If the job was cancelled
"""
# Return early if we have no job id yet.
if not self._job_id:
return self._status
# Return early if the job is already final.
if self._status in jobstatus.JOB_FINAL_STATES:
return self._children_status() if detailed else self._status
# Otherwise, look up a status enum from the response.
response = self._client.retrieve_job(self._job_id)
api_response_status = response.get("status")
try:
status_enum = constants.APIJobStatus(api_response_status)
mapped_status = constants.JobStatusMap[status_enum.name]
self._status = jobstatus.JobStatus[mapped_status.value]
except (ValueError, KeyError) as ex:
raise exceptions.IonQJobError(
f"Unknown or unmappable job status {api_response_status}"
) from ex
if self._status in jobstatus.JOB_FINAL_STATES:
self._save_metadata(response)
if self._status == jobstatus.JobStatus.DONE:
stats = response.get("stats", {})
self._children = self._first_of(
response, "child_job_ids", "children", default=None
)
# Circuit count: if we have children, prefer that length
if self._children:
self._num_circuits = len(self._children)
else:
self._num_circuits = self._first_of(stats, "circuits", default=1)
self._num_qubits = self._first_of(stats, "qubits", default=0)
_results_url = self._first_of(
response, "results", "results_url", default={}
)
self._results_url = (
_results_url
if isinstance(_results_url, str)
else _results_url.get("probabilities", {}).get("url")
)
# Classical-bit maps per circuit
def _meas_map_from_header(header_dict, fallback_nq):
"""Return meas_mapped list or a default 0-based map."""
mmap = header_dict.get("meas_mapped")
if mmap is None or (
isinstance(mmap, list) and all(b is None for b in mmap)
):
return list(range(header_dict.get("n_qubits", fallback_nq)))
return mmap
# Classical-bit maps for every circuit
header_list = decompress_metadata_string(
response.get("metadata", {}).get("qiskit_header")
)
if not isinstance(header_list, list):
header_list = [header_list]
self._clbits = [
_meas_map_from_header(h, self._num_qubits) for h in header_list
]
# Ensure one map per circuit
if len(self._clbits) == 1 and self._num_circuits > 1:
self._clbits *= self._num_circuits
# Prefer the API-supplied execution time
self._execution_time = (
self._first_of(
response,
"execution_duration_ms",
"execution_time",
default=float("inf"),
)
/ 1000
)
if self._status == jobstatus.JobStatus.ERROR:
failure = response.get("failure") or {}
raise exceptions.IonQJobFailureError(
f"Unable to retrieve result for job {self._job_id}. "
f'Failure from IonQ API "{failure.get("code","")}: '
f'{failure.get("error","")}"'
)
if self._status == jobstatus.JobStatus.CANCELLED:
warnings.warn(
f"Unable to retrieve result for job {self._job_id}. Job was cancelled"
)
# Propagate any warnings returned by the API
if "warning" in response and "messages" in response["warning"]:
for msg in response["warning"]["messages"]:
warnings.warn(msg)
return self._children_status() if detailed else self._status
def _children_status(self):
"""Return a dictionary describing the status of any child jobs.
Raises:
IonQJobError: If the IonQ job status was unknown or otherwise
unmappable to a qiskit job status.
IonQJobFailureError: If the job fails
IonQJobStateError: If the job was cancelled
Returns:
dict: A dictionary containing the detailed status of the children.
"""
response = self._client.retrieve_job(self._job_id)
child_ids = self._first_of(response, "child_job_ids", "children", [])
child_statuses = []
for child_id in child_ids:
resp = self._client.retrieve_job(child_id)
api_status = resp.get("status")
# Map API status to JobStatus enum
try:
status_enum = constants.APIJobStatus(api_status)
status_enum = constants.JobStatusMap[status_enum.name]
qiskit_status = jobstatus.JobStatus[status_enum.value]
except (ValueError, KeyError) as ex:
raise exceptions.IonQJobError(
f"Unknown or unmappable child job status {api_status}"
) from ex
child_statuses.append(qiskit_status)
total = len(child_statuses)
completed = child_statuses.count(jobstatus.JobStatus.DONE)
failed = child_statuses.count(jobstatus.JobStatus.ERROR)
return {
"total": total,
"completed": completed,
"failed": failed,
"percentage_complete": completed / total if total else 0,
"statuses": child_statuses,
}
def _format_result(self, data):
"""Translate IonQ result format into a Qiskit `Result` instance.
Args:
result (dict): A JSON body response from a REST API call.
Returns:
Result: A Qiskit :class:`Result <qiskit.result.Result>` representation of this job.
Raises:
IonQJobFailureError: If the remote job has an error status.
IonQJobStateError: If the job was cancelled before this method fetches it.
"""
backend = self.backend()
backend_name = backend.name
backend_version = backend.backend_version
is_ideal_sim = (
backend_name == "ionq_simulator" and backend.options.noise_model == "ideal"
)
success = self._status == jobstatus.JobStatus.DONE
metadata = self._metadata.get("metadata") or {}
sampler_seed = (
int(metadata.get("sampler_seed", ""))
if metadata.get("sampler_seed", "").isdigit()
else None
)
qiskit_header = decompress_metadata_string(metadata.get("qiskit_header"))
if not isinstance(qiskit_header, list):
qiskit_header = [qiskit_header]
shots = (
int(metadata.get("shots", 1024))
if str(metadata.get("shots", "1024")).isdigit()
else 1024
)
if isinstance(data, dict):
looks_like_multi = all(
isinstance(v, dict) and all(isinstance(p, float) for p in v.values())
for v in data.values()
)
data = list(data.values()) if looks_like_multi else [data]
elif isinstance(data, list):
pass
else:
raise exceptions.IonQJobError("Unexpected result payload type")
# pad headers if API dropped them
while len(qiskit_header) < self._num_circuits:
qiskit_header.append({})
job_result = [
{
"data": {},
"shots": shots,
"header": qiskit_header[i] or {},
"success": success,
}
for i in range(self._num_circuits)
]
if self._status == jobstatus.JobStatus.DONE:
for i in range(self._num_circuits):
(counts, probabilities) = _build_counts(
data[i],
qiskit_header[i].get("n_qubits", self._num_qubits),
self._clbits[i],
shots,
use_sampler=is_ideal_sim,
sampler_seed=sampler_seed,
)
job_result[i]["data"] = {
"counts": counts,
"probabilities": probabilities,
"metadata": qiskit_header[i] or {},
}
# Final Qiskit Result object
return Result.from_dict(
{
"results": job_result,
"job_id": self.job_id(),
"backend_name": backend_name,
"backend_version": backend_version,
"qobj_id": metadata.get("qobj_id"),
"success": success,
"time_taken": self._execution_time,
}
)
def _save_metadata(self, response):
"""Persist metadata from the API response to this instance.
Args:
response (dict): A JSON body response from a REST API call.
"""
self._metadata.update(response)
__all__ = ["IonQJob"]