How-to: run circuit on Braket device using verbatim mode

[1]:
from qiskit import QuantumCircuit
from qiskit.visualization import plot_histogram
from qiskit_braket_provider import AWSBraketProvider

Some AWS backends support running circuits using verbatim mode and/or with disabled qubit rewiring. If you are not familiar with those concepts, refer to the documentation, here we’ll constrain ourselves to a short reminder:

  • verbatim box is a part of the circuit that will not be compiled by Braket. Hence, it needs to comprise only gates that are native to particular device.

  • qubit rewiring is a process in which logical qubits in submitted circuits are mapped into concrete physical qubits on actual device.

Typically, one wants to design circuits using high-level concepts, without worrying about qubit placement or gate compilation. However, in some cases it might be desirable to have a precise control of the circuits being submitted.

We will start this how-to by defining a simple circuit comprising cnot(0, 1) gate and running it on Aspem-M-3 device.

[2]:
provider = AWSBraketProvider()
aspen = provider.get_backend("Aspen-M-3")

circuit = QuantumCircuit(2)
circuit.cnot(0, 1)

result = aspen.run(circuit, shots=10)

plot_histogram(result.result().get_counts())
[2]:
../_images/how_tos_4_how_to_verbatim_circuits_3_0.png

Our circuit run perfectly fine. Since Rigetti devices don’t support CNOT gate natively, we deduce that the circuit was compiled by Braket and our CNOT gate got decomposed into some equivalent sequence of gates.

We will now see what happens if we pass verbatim=True and disable_qubit_rewiring=True options.

[3]:
aspen.run(circuit, verbatim=True, disable_qubit_rewiring=True)
---------------------------------------------------------------------------
ValidationException                       Traceback (most recent call last)
Input In [3], in <cell line: 1>()
----> 1 aspen.run(circuit, verbatim=True, disable_qubit_rewiring=True)

File ~/Projects/qiskit-braket-provider/qiskit_braket_provider/providers/braket_backend.py:251, in AWSBraketBackend.run(self, run_input, **options)
    248 if options.pop("verbatim", False):
    249     braket_circuits = wrap_circuits_in_verbatim_box(braket_circuits)
--> 251 batch_task: AwsQuantumTaskBatch = self._device.run_batch(
    252     braket_circuits, **options
    253 )
    254 tasks: List[AwsQuantumTask] = batch_task.tasks
    255 job_id = TASK_ID_DIVIDER.join(task.id for task in tasks)

File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_device.py:208, in AwsDevice.run_batch(self, task_specifications, s3_destination_folder, shots, max_parallel, max_connections, poll_timeout_seconds, poll_interval_seconds, *aws_quantum_task_args, **aws_quantum_task_kwargs)
    165 def run_batch(
    166     self,
    167     task_specifications: List[Union[Circuit, Problem, OpenQasmProgram, BlackbirdProgram]],
   (...)
    175     **aws_quantum_task_kwargs,
    176 ) -> AwsQuantumTaskBatch:
    177     """Executes a batch of tasks in parallel
    178
    179     Args:
   (...)
    206         `braket.aws.aws_quantum_task_batch.AwsQuantumTaskBatch`
    207     """
--> 208     return AwsQuantumTaskBatch(
    209         AwsSession.copy_session(self._aws_session, max_connections=max_connections),
    210         self._arn,
    211         task_specifications,
    212         s3_destination_folder
    213         or (
    214             AwsSession.parse_s3_uri(os.environ.get("AMZN_braket_job_RESULTS_S3_URI"))
    215             if "AMZN_braket_job_RESULTS_S3_URI" in os.environ
    216             else None
    217         )
    218         or (self._aws_session.default_bucket(), "tasks"),
    219         shots if shots is not None else self._default_shots,
    220         max_parallel=max_parallel if max_parallel is not None else self._default_max_parallel,
    221         max_workers=max_connections,
    222         poll_timeout_seconds=poll_timeout_seconds,
    223         poll_interval_seconds=poll_interval_seconds,
    224         *aws_quantum_task_args,
    225         **aws_quantum_task_kwargs,
    226     )

File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_quantum_task_batch.py:83, in AwsQuantumTaskBatch.__init__(self, aws_session, device_arn, task_specifications, s3_destination_folder, shots, max_parallel, max_workers, poll_timeout_seconds, poll_interval_seconds, *aws_quantum_task_args, **aws_quantum_task_kwargs)
     42 def __init__(
     43     self,
     44     aws_session: AwsSession,
   (...)
     54     **aws_quantum_task_kwargs,
     55 ):
     56     """Creates a batch of quantum tasks.
     57
     58     Args:
   (...)
     81             `braket.aws.aws_quantum_task.AwsQuantumTask.create()`.
     82     """
---> 83     self._tasks = AwsQuantumTaskBatch._execute(
     84         aws_session,
     85         device_arn,
     86         task_specifications,
     87         s3_destination_folder,
     88         shots,
     89         max_parallel,
     90         max_workers,
     91         poll_timeout_seconds,
     92         poll_interval_seconds,
     93         *aws_quantum_task_args,
     94         **aws_quantum_task_kwargs,
     95     )
     96     self._aws_session = aws_session
     97     self._results = None

File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_quantum_task_batch.py:151, in AwsQuantumTaskBatch._execute(aws_session, device_arn, task_specifications, s3_destination_folder, shots, max_parallel, max_workers, poll_timeout_seconds, poll_interval_seconds, *args, **kwargs)
    134 with ThreadPoolExecutor(max_workers=max_threads) as executor:
    135     task_futures = [
    136         executor.submit(
    137             AwsQuantumTaskBatch._create_task,
   (...)
    149         for task in task_specifications
    150     ]
--> 151 tasks = [future.result() for future in task_futures]
    152 return tasks

File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_quantum_task_batch.py:151, in <listcomp>(.0)
    134 with ThreadPoolExecutor(max_workers=max_threads) as executor:
    135     task_futures = [
    136         executor.submit(
    137             AwsQuantumTaskBatch._create_task,
   (...)
    149         for task in task_specifications
    150     ]
--> 151 tasks = [future.result() for future in task_futures]
    152 return tasks

File /usr/lib64/python3.9/concurrent/futures/_base.py:439, in Future.result(self, timeout)
    437     raise CancelledError()
    438 elif self._state == FINISHED:
--> 439     return self.__get_result()
    441 self._condition.wait(timeout)
    443 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File /usr/lib64/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File /usr/lib64/python3.9/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_quantum_task_batch.py:166, in AwsQuantumTaskBatch._create_task(remaining, aws_session, device_arn, task_specification, s3_destination_folder, shots, poll_interval_seconds, *args, **kwargs)
    154 @staticmethod
    155 def _create_task(
    156     remaining,
   (...)
    164     **kwargs,
    165 ):
--> 166     task = AwsQuantumTask.create(
    167         aws_session,
    168         device_arn,
    169         task_specification,
    170         s3_destination_folder,
    171         shots,
    172         poll_interval_seconds=poll_interval_seconds,
    173         *args,
    174         **kwargs,
    175     )
    177     remaining.pop()
    179     # If the task hits a terminal state before all tasks have been created,
    180     # it can be returned immediately

File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_quantum_task.py:147, in AwsQuantumTask.create(aws_session, device_arn, task_specification, s3_destination_folder, shots, device_parameters, disable_qubit_rewiring, tags, *args, **kwargs)
    142 if isinstance(task_specification, Circuit) and task_specification.parameters:
    143     raise ValueError(
    144         f"Cannot execute circuit with unbound parameters: "
    145         f"{task_specification.parameters}"
    146     )
--> 147 return _create_internal(
    148     task_specification,
    149     aws_session,
    150     create_task_kwargs,
    151     device_arn,
    152     device_parameters or {},
    153     disable_qubit_rewiring,
    154     *args,
    155     **kwargs,
    156 )

File /usr/lib64/python3.9/functools.py:888, in singledispatch.<locals>.wrapper(*args, **kw)
    884 if not args:
    885     raise TypeError(f'{funcname} requires at least '
    886                     '1 positional argument')
--> 888 return dispatch(args[0].__class__)(*args, **kw)

File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_quantum_task.py:496, in _(circuit, aws_session, create_task_kwargs, device_arn, device_parameters, disable_qubit_rewiring, *args, **kwargs)
    489     device_parameters = GateModelSimulatorDeviceParameters(
    490         paradigmParameters=paradigm_parameters
    491     )
    493 create_task_kwargs.update(
    494     {"action": circuit.to_ir().json(), "deviceParameters": device_parameters.json()}
    495 )
--> 496 task_arn = aws_session.create_quantum_task(**create_task_kwargs)
    497 return AwsQuantumTask(task_arn, aws_session, *args, **kwargs)

File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_session.py:180, in AwsSession.create_quantum_task(self, **boto3_kwargs)
    178 if job_token:
    179     boto3_kwargs.update({"jobToken": job_token})
--> 180 response = self.braket_client.create_quantum_task(**boto3_kwargs)
    181 return response["quantumTaskArn"]

File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/botocore/client.py:508, in ClientCreator._create_api_method.<locals>._api_call(self, *args, **kwargs)
    504     raise TypeError(
    505         f"{py_operation_name}() only accepts keyword arguments."
    506     )
    507 # The "self" in this scope is referring to the BaseClient.
--> 508 return self._make_api_call(operation_name, kwargs)

File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/botocore/client.py:915, in BaseClient._make_api_call(self, operation_name, api_params)
    913     error_code = parsed_response.get("Error", {}).get("Code")
    914     error_class = self.exceptions.from_code(error_code)
--> 915     raise error_class(parsed_response, operation_name)
    916 else:
    917     return parsed_response

ValidationException: An error occurred (ValidationException) when calling the CreateQuantumTask operation: Backend ARN, arn:aws:braket:us-west-1::device/qpu/rigetti/Aspen-M-3, does not support the operation cnot as a native gate. Supported operations for this Backend ARN are ['rz', 'cz', 'xy', 'rx', 'cphaseshift'].

As expected, we obtained a ValidationException. As a last step, let us verify that running circuit with native gates and matching topology is possible.

[ ]:
native_circuit = QuantumCircuit(2)
native_circuit.cz(0, 1)

result = aspen.run(native_circuit, verbatim=True, disable_qubit_rewiring=True).result()
plot_histogram(result.get_counts())