How-to: Run verbatim circuits¶
[1]:
from qiskit import QuantumCircuit
from qiskit.visualization import plot_histogram
from qiskit_braket_provider import BraketProvider
Some AWS backends support running circuits using verbatim mode and/or with disabled qubit rewiring. If you are not familiar with those concepts, refer to the documentation, here we’ll constrain ourselves to a short reminder:
verbatim boxis a part of the circuit that will not be compiled by Braket. Hence, it needs to comprise only gates that are native to particular device.qubit rewiringis a process in which logical qubits in submitted circuits are mapped into concrete physical qubits on actual device.
Typically, one wants to design circuits using high-level concepts, without worrying about qubit placement or gate compilation. However, in some cases it might be desirable to have a precise control of the circuits being submitted.
We will start this how-to by defining a simple circuit comprising cnot(0, 1) gate and running it on Aspem-M-3 device.
[2]:
provider = BraketProvider()
backend = provider.get_backend("Ankaa-3")
circuit = QuantumCircuit(2)
circuit.cx(0, 1)
result = backend.run(circuit, shots=10)
plot_histogram(result.result().get_counts())
[2]:
Our circuit run perfectly fine. Since Rigetti devices don’t support CNOT gate natively, we deduce that the circuit was compiled by Braket and our CNOT gate got decomposed into some equivalent sequence of gates.
We will now see what happens if we pass verbatim=True and disable_qubit_rewiring=True options.
[3]:
backend.run(circuit, verbatim=True, disable_qubit_rewiring=True)
---------------------------------------------------------------------------
ValidationException Traceback (most recent call last)
Cell In[3], line 1
----> 1 backend.run(circuit, verbatim=True, disable_qubit_rewiring=True)
File ~/Documents/GitHub/qiskit-braket-provider/qiskit_braket_provider/providers/braket_backend.py:411, in BraketAwsBackend.run(self, run_input, shots, verbatim, native, optimization_level, callback, num_processes, pass_manager, **options)
390 target, basis_gates = self._target_and_basis_gates(native, pass_manager)
391 braket_circuits = (
392 to_braket(circuits, verbatim=True, qubit_labels=self._qubit_labels)
393 if verbatim
(...)
406 )
407 )
408 return (
409 self._run_program_set(braket_circuits, shots, **options)
410 if self._supports_program_sets and shots != 0 and len(braket_circuits) > 1
--> 411 else self._run_batch(braket_circuits, shots, **options)
412 )
File ~/Documents/GitHub/qiskit-braket-provider/qiskit_braket_provider/providers/braket_backend.py:425, in BraketAwsBackend._run_batch(self, braket_circuits, shots, **options)
424 def _run_batch(self, braket_circuits: list[Circuit], shots: int, **options):
--> 425 batch_task = self._device.run_batch(braket_circuits, shots=shots, **options)
426 tasks: list[AwsQuantumTask] = batch_task.tasks
427 task_id = _TASK_ID_DIVIDER.join(task.id for task in tasks)
File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_device.py:286, in AwsDevice.run_batch(self, task_specifications, s3_destination_folder, shots, max_parallel, max_connections, poll_timeout_seconds, poll_interval_seconds, inputs, gate_definitions, reservation_arn, *aws_quantum_task_args, **aws_quantum_task_kwargs)
281 if self._noise_model:
282 task_specifications = [
283 self._apply_noise_model_to_circuit(task_specification)
284 for task_specification in task_specifications
285 ]
--> 286 return AwsQuantumTaskBatch(
287 AwsSession.copy_session(self._aws_session, max_connections=max_connections),
288 self._arn,
289 task_specifications,
290 s3_destination_folder
291 or (
292 AwsSession.parse_s3_uri(os.environ.get("AMZN_BRAKET_TASK_RESULTS_S3_URI"))
293 if "AMZN_BRAKET_TASK_RESULTS_S3_URI" in os.environ
294 else None
295 )
296 or (self._aws_session.default_bucket(), "tasks"),
297 shots if shots is not None else self._default_shots(),
298 max_parallel=max_parallel if max_parallel is not None else self._default_max_parallel,
299 max_workers=max_connections,
300 poll_timeout_seconds=poll_timeout_seconds,
301 poll_interval_seconds=poll_interval_seconds or self._poll_interval_seconds,
302 inputs=inputs,
303 gate_definitions=gate_definitions,
304 reservation_arn=reservation_arn,
305 *aws_quantum_task_args,
306 **aws_quantum_task_kwargs,
307 )
File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_quantum_task_batch.py:107, in AwsQuantumTaskBatch.__init__(self, aws_session, device_arn, task_specifications, s3_destination_folder, shots, max_parallel, max_workers, poll_timeout_seconds, poll_interval_seconds, inputs, gate_definitions, reservation_arn, *aws_quantum_task_args, **aws_quantum_task_kwargs)
50 def __init__(
51 self,
52 aws_session: AwsSession,
(...)
69 **aws_quantum_task_kwargs: Any,
70 ):
71 """Creates a batch of quantum tasks.
72
73 Args:
(...)
105 **aws_quantum_task_kwargs (Any): Arbitrary kwargs for `QuantumTask`.,
106 """ # noqa: E501
--> 107 self._tasks = AwsQuantumTaskBatch._execute(
108 aws_session,
109 device_arn,
110 task_specifications,
111 s3_destination_folder,
112 shots,
113 max_parallel,
114 max_workers,
115 poll_timeout_seconds,
116 poll_interval_seconds,
117 inputs,
118 gate_definitions,
119 reservation_arn,
120 *aws_quantum_task_args,
121 **aws_quantum_task_kwargs,
122 )
123 self._aws_session = aws_session
124 self._results = None
File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_quantum_task_batch.py:255, in AwsQuantumTaskBatch._execute(aws_session, device_arn, task_specifications, s3_destination_folder, shots, max_parallel, max_workers, poll_timeout_seconds, poll_interval_seconds, inputs, gate_definitions, reservation_arn, *args, **kwargs)
252 remaining.clear()
254 raise
--> 255 return [future.result() for future in task_futures]
File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_quantum_task_batch.py:255, in <listcomp>(.0)
252 remaining.clear()
254 raise
--> 255 return [future.result() for future in task_futures]
File ~/anaconda3/lib/python3.10/concurrent/futures/_base.py:451, in Future.result(self, timeout)
449 raise CancelledError()
450 elif self._state == FINISHED:
--> 451 return self.__get_result()
453 self._condition.wait(timeout)
455 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
File ~/anaconda3/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
401 if self._exception:
402 try:
--> 403 raise self._exception
404 finally:
405 # Break a reference cycle with the exception in self._exception
406 self = None
File ~/anaconda3/lib/python3.10/concurrent/futures/thread.py:58, in _WorkItem.run(self)
55 return
57 try:
---> 58 result = self.fn(*self.args, **self.kwargs)
59 except BaseException as exc:
60 self.future.set_exception(exc)
File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_quantum_task_batch.py:272, in AwsQuantumTaskBatch._create_task(remaining, aws_session, device_arn, task_specification, s3_destination_folder, shots, poll_interval_seconds, inputs, gate_definitions, reservation_arn, *args, **kwargs)
257 @staticmethod
258 def _create_task(
259 remaining: list[int],
(...)
270 **kwargs,
271 ) -> AwsQuantumTask:
--> 272 task = AwsQuantumTask.create(
273 aws_session,
274 device_arn,
275 task_specification,
276 s3_destination_folder,
277 shots,
278 poll_interval_seconds=poll_interval_seconds,
279 inputs=inputs,
280 gate_definitions=gate_definitions,
281 reservation_arn=reservation_arn,
282 *args,
283 **kwargs,
284 )
286 remaining.pop()
288 # If the quantum task hits a terminal state before all quantum tasks have been created,
289 # it can be returned immediately
File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_quantum_task.py:209, in AwsQuantumTask.create(aws_session, device_arn, task_specification, s3_destination_folder, shots, device_parameters, disable_qubit_rewiring, tags, inputs, gate_definitions, quiet, reservation_arn, *args, **kwargs)
204 if unbounded_parameters := param_names - set(inputs.keys()):
205 raise ValueError(
206 f"Cannot execute circuit with unbound parameters: {unbounded_parameters}"
207 )
--> 209 return _create_internal(
210 task_specification,
211 aws_session,
212 create_task_kwargs,
213 device_arn,
214 device_parameters or {},
215 disable_qubit_rewiring,
216 inputs,
217 gate_definitions,
218 quiet=quiet,
219 *args,
220 **kwargs,
221 )
File ~/anaconda3/lib/python3.10/functools.py:889, in singledispatch.<locals>.wrapper(*args, **kw)
885 if not args:
886 raise TypeError(f'{funcname} requires at least '
887 '1 positional argument')
--> 889 return dispatch(args[0].__class__)(*args, **kw)
File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_quantum_task.py:775, in _(circuit, aws_session, create_task_kwargs, device_arn, device_parameters, disable_qubit_rewiring, inputs, gate_definitions, *args, **kwargs)
766 openqasm_program = OpenQASMProgram(
767 source=openqasm_program.source,
768 inputs=inputs_copy,
769 )
771 create_task_kwargs |= {
772 "action": openqasm_program.json(),
773 "deviceParameters": final_device_parameters.json(exclude_none=True),
774 }
--> 775 task_arn = aws_session.create_quantum_task(**create_task_kwargs)
776 return AwsQuantumTask(task_arn, aws_session, *args, **kwargs)
File ~/Documents/GitHub/amazon-braket-sdk-python/src/braket/aws/aws_session.py:273, in AwsSession.create_quantum_task(self, **boto3_kwargs)
271 if job_token:
272 boto3_kwargs["jobToken"] = job_token
--> 273 response = self.braket_client.create_quantum_task(**boto3_kwargs)
274 broadcast_event(
275 _TaskCreationEvent(
276 arn=response["quantumTaskArn"],
(...)
280 )
281 )
282 return response["quantumTaskArn"]
File ~/anaconda3/lib/python3.10/site-packages/botocore/client.py:602, in ClientCreator._create_api_method.<locals>._api_call(self, *args, **kwargs)
598 raise TypeError(
599 f"{py_operation_name}() only accepts keyword arguments."
600 )
601 # The "self" in this scope is referring to the BaseClient.
--> 602 return self._make_api_call(operation_name, kwargs)
File ~/anaconda3/lib/python3.10/site-packages/botocore/context.py:123, in with_current_context.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
121 if hook:
122 hook()
--> 123 return func(*args, **kwargs)
File ~/anaconda3/lib/python3.10/site-packages/botocore/client.py:1078, in BaseClient._make_api_call(self, operation_name, api_params)
1074 error_code = request_context.get(
1075 'error_code_override'
1076 ) or error_info.get("Code")
1077 error_class = self.exceptions.from_code(error_code)
-> 1078 raise error_class(parsed_response, operation_name)
1079 else:
1080 return parsed_response
ValidationException: An error occurred (ValidationException) when calling the CreateQuantumTask operation: [line 5] In verbatim, native gates are required. cnot is not a device native gate.
As expected, we obtained a ValidationException. As a last step, let us verify that running circuit with native gates and matching topology is possible.
[4]:
native_circuit = QuantumCircuit(2)
native_circuit.iswap(0, 1)
result = backend.run(native_circuit, verbatim=True, disable_qubit_rewiring=True).result()
plot_histogram(result.get_counts())
[4]: