How-to: Run verbatim circuits

[1]:
from qiskit import QuantumCircuit
from qiskit.visualization import plot_histogram

from qiskit_braket_provider import BraketProvider

Some AWS backends support running circuits using verbatim mode and/or with disabled qubit rewiring. If you are not familiar with those concepts, refer to the documentation, here we’ll constrain ourselves to a short reminder:

  • verbatim box is a part of the circuit that will not be compiled by Braket. Hence, it needs to comprise only gates that are native to particular device.

  • qubit rewiring is a process in which logical qubits in submitted circuits are mapped into concrete physical qubits on actual device.

Typically, one wants to design circuits using high-level concepts, without worrying about qubit placement or gate compilation. However, in some cases it might be desirable to have a precise control of the circuits being submitted.

We will start this how-to by defining a simple circuit comprising cnot(0, 1) gate and running it on Aspem-M-3 device.

[2]:
provider = BraketProvider()
backend = provider.get_backend("Ankaa-3")

circuit = QuantumCircuit(2)
circuit.cx(0, 1)

result = backend.run(circuit, shots=10)

plot_histogram(result.result().get_counts())
[2]:
../_images/how_tos_4_how_to_verbatim_circuits_3_0.png

Our circuit run perfectly fine. Since Rigetti devices don’t support CNOT gate natively, we deduce that the circuit was compiled by Braket and our CNOT gate got decomposed into some equivalent sequence of gates.

We will now see what happens if we pass verbatim=True and disable_qubit_rewiring=True options.

[3]:
backend.run(circuit, verbatim=True, disable_qubit_rewiring=True)
---------------------------------------------------------------------------
ValidationException                       Traceback (most recent call last)
Cell In[3], line 1
----> 1 backend.run(circuit, verbatim=True, disable_qubit_rewiring=True)

File ~/Documents/GitHub/qiskit-braket-provider/qiskit_braket_provider/providers/braket_backend.py:411, in BraketAwsBackend.run(self, run_input, shots, verbatim, native, optimization_level, callback, num_processes, pass_manager, **options)
    390 target, basis_gates = self._target_and_basis_gates(native, pass_manager)
    391 braket_circuits = (
    392     to_braket(circuits, verbatim=True, qubit_labels=self._qubit_labels)
    393     if verbatim
   (...)
    406     )
    407 )
    408 return (
    409     self._run_program_set(braket_circuits, shots, **options)
    410     if self._supports_program_sets and shots != 0 and len(braket_circuits) > 1
--> 411     else self._run_batch(braket_circuits, shots, **options)
    412 )

File ~/Documents/GitHub/qiskit-braket-provider/qiskit_braket_provider/providers/braket_backend.py:425, in BraketAwsBackend._run_batch(self, braket_circuits, shots, **options)
    424 def _run_batch(self, braket_circuits: list[Circuit], shots: int, **options):
--> 425     batch_task = self._device.run_batch(braket_circuits, shots=shots, **options)
    426     tasks: list[AwsQuantumTask] = batch_task.tasks
    427     task_id = _TASK_ID_DIVIDER.join(task.id for task in tasks)

File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_device.py:286, in AwsDevice.run_batch(self, task_specifications, s3_destination_folder, shots, max_parallel, max_connections, poll_timeout_seconds, poll_interval_seconds, inputs, gate_definitions, reservation_arn, *aws_quantum_task_args, **aws_quantum_task_kwargs)
    281 if self._noise_model:
    282     task_specifications = [
    283         self._apply_noise_model_to_circuit(task_specification)
    284         for task_specification in task_specifications
    285     ]
--> 286 return AwsQuantumTaskBatch(
    287     AwsSession.copy_session(self._aws_session, max_connections=max_connections),
    288     self._arn,
    289     task_specifications,
    290     s3_destination_folder
    291     or (
    292         AwsSession.parse_s3_uri(os.environ.get("AMZN_BRAKET_TASK_RESULTS_S3_URI"))
    293         if "AMZN_BRAKET_TASK_RESULTS_S3_URI" in os.environ
    294         else None
    295     )
    296     or (self._aws_session.default_bucket(), "tasks"),
    297     shots if shots is not None else self._default_shots(),
    298     max_parallel=max_parallel if max_parallel is not None else self._default_max_parallel,
    299     max_workers=max_connections,
    300     poll_timeout_seconds=poll_timeout_seconds,
    301     poll_interval_seconds=poll_interval_seconds or self._poll_interval_seconds,
    302     inputs=inputs,
    303     gate_definitions=gate_definitions,
    304     reservation_arn=reservation_arn,
    305     *aws_quantum_task_args,
    306     **aws_quantum_task_kwargs,
    307 )

File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_quantum_task_batch.py:107, in AwsQuantumTaskBatch.__init__(self, aws_session, device_arn, task_specifications, s3_destination_folder, shots, max_parallel, max_workers, poll_timeout_seconds, poll_interval_seconds, inputs, gate_definitions, reservation_arn, *aws_quantum_task_args, **aws_quantum_task_kwargs)
     50 def __init__(
     51     self,
     52     aws_session: AwsSession,
   (...)
     69     **aws_quantum_task_kwargs: Any,
     70 ):
     71     """Creates a batch of quantum tasks.
     72
     73     Args:
   (...)
    105         **aws_quantum_task_kwargs (Any): Arbitrary kwargs for `QuantumTask`.,
    106     """  # noqa: E501
--> 107     self._tasks = AwsQuantumTaskBatch._execute(
    108         aws_session,
    109         device_arn,
    110         task_specifications,
    111         s3_destination_folder,
    112         shots,
    113         max_parallel,
    114         max_workers,
    115         poll_timeout_seconds,
    116         poll_interval_seconds,
    117         inputs,
    118         gate_definitions,
    119         reservation_arn,
    120         *aws_quantum_task_args,
    121         **aws_quantum_task_kwargs,
    122     )
    123     self._aws_session = aws_session
    124     self._results = None

File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_quantum_task_batch.py:255, in AwsQuantumTaskBatch._execute(aws_session, device_arn, task_specifications, s3_destination_folder, shots, max_parallel, max_workers, poll_timeout_seconds, poll_interval_seconds, inputs, gate_definitions, reservation_arn, *args, **kwargs)
    252     remaining.clear()
    254     raise
--> 255 return [future.result() for future in task_futures]

File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_quantum_task_batch.py:255, in <listcomp>(.0)
    252     remaining.clear()
    254     raise
--> 255 return [future.result() for future in task_futures]

File ~/anaconda3/lib/python3.10/concurrent/futures/_base.py:451, in Future.result(self, timeout)
    449     raise CancelledError()
    450 elif self._state == FINISHED:
--> 451     return self.__get_result()
    453 self._condition.wait(timeout)
    455 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/anaconda3/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File ~/anaconda3/lib/python3.10/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_quantum_task_batch.py:272, in AwsQuantumTaskBatch._create_task(remaining, aws_session, device_arn, task_specification, s3_destination_folder, shots, poll_interval_seconds, inputs, gate_definitions, reservation_arn, *args, **kwargs)
    257 @staticmethod
    258 def _create_task(
    259     remaining: list[int],
   (...)
    270     **kwargs,
    271 ) -> AwsQuantumTask:
--> 272     task = AwsQuantumTask.create(
    273         aws_session,
    274         device_arn,
    275         task_specification,
    276         s3_destination_folder,
    277         shots,
    278         poll_interval_seconds=poll_interval_seconds,
    279         inputs=inputs,
    280         gate_definitions=gate_definitions,
    281         reservation_arn=reservation_arn,
    282         *args,
    283         **kwargs,
    284     )
    286     remaining.pop()
    288     # If the quantum task hits a terminal state before all quantum tasks have been created,
    289     # it can be returned immediately

File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_quantum_task.py:209, in AwsQuantumTask.create(aws_session, device_arn, task_specification, s3_destination_folder, shots, device_parameters, disable_qubit_rewiring, tags, inputs, gate_definitions, quiet, reservation_arn, *args, **kwargs)
    204     if unbounded_parameters := param_names - set(inputs.keys()):
    205         raise ValueError(
    206             f"Cannot execute circuit with unbound parameters: {unbounded_parameters}"
    207         )
--> 209 return _create_internal(
    210     task_specification,
    211     aws_session,
    212     create_task_kwargs,
    213     device_arn,
    214     device_parameters or {},
    215     disable_qubit_rewiring,
    216     inputs,
    217     gate_definitions,
    218     quiet=quiet,
    219     *args,
    220     **kwargs,
    221 )

File ~/anaconda3/lib/python3.10/functools.py:889, in singledispatch.<locals>.wrapper(*args, **kw)
    885 if not args:
    886     raise TypeError(f'{funcname} requires at least '
    887                     '1 positional argument')
--> 889 return dispatch(args[0].__class__)(*args, **kw)

File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_quantum_task.py:775, in _(circuit, aws_session, create_task_kwargs, device_arn, device_parameters, disable_qubit_rewiring, inputs, gate_definitions, *args, **kwargs)
    766     openqasm_program = OpenQASMProgram(
    767         source=openqasm_program.source,
    768         inputs=inputs_copy,
    769     )
    771 create_task_kwargs |= {
    772     "action": openqasm_program.json(),
    773     "deviceParameters": final_device_parameters.json(exclude_none=True),
    774 }
--> 775 task_arn = aws_session.create_quantum_task(**create_task_kwargs)
    776 return AwsQuantumTask(task_arn, aws_session, *args, **kwargs)

File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_session.py:273, in AwsSession.create_quantum_task(self, **boto3_kwargs)
    271 if job_token:
    272     boto3_kwargs["jobToken"] = job_token
--> 273 response = self.braket_client.create_quantum_task(**boto3_kwargs)
    274 broadcast_event(
    275     _TaskCreationEvent(
    276         arn=response["quantumTaskArn"],
   (...)
    280     )
    281 )
    282 return response["quantumTaskArn"]

File ~/anaconda3/lib/python3.10/site-packages/botocore/client.py:602, in ClientCreator._create_api_method.<locals>._api_call(self, *args, **kwargs)
    598     raise TypeError(
    599         f"{py_operation_name}() only accepts keyword arguments."
    600     )
    601 # The "self" in this scope is referring to the BaseClient.
--> 602 return self._make_api_call(operation_name, kwargs)

File ~/anaconda3/lib/python3.10/site-packages/botocore/context.py:123, in with_current_context.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
    121 if hook:
    122     hook()
--> 123 return func(*args, **kwargs)

File ~/anaconda3/lib/python3.10/site-packages/botocore/client.py:1078, in BaseClient._make_api_call(self, operation_name, api_params)
   1074     error_code = request_context.get(
   1075         'error_code_override'
   1076     ) or error_info.get("Code")
   1077     error_class = self.exceptions.from_code(error_code)
-> 1078     raise error_class(parsed_response, operation_name)
   1079 else:
   1080     return parsed_response

ValidationException: An error occurred (ValidationException) when calling the CreateQuantumTask operation: [line 5] In verbatim, native gates are required. cnot is not a device native gate.

As expected, we obtained a ValidationException. As a last step, let us verify that running circuit with native gates and matching topology is possible.

[4]:
native_circuit = QuantumCircuit(2)
native_circuit.iswap(0, 1)

result = backend.run(native_circuit, verbatim=True, disable_qubit_rewiring=True).result()
plot_histogram(result.get_counts())
[4]:
../_images/how_tos_4_how_to_verbatim_circuits_7_0.png