How-to: run circuit on Braket device using verbatim mode¶
[1]:
from qiskit import QuantumCircuit
from qiskit.visualization import plot_histogram
from qiskit_braket_provider import AWSBraketProvider
Some AWS backends support running circuits using verbatim mode and/or with disabled qubit rewiring. If you are not familiar with those concepts, refer to the documentation, here we’ll constrain ourselves to a short reminder:
verbatim box
is a part of the circuit that will not be compiled by Braket. Hence, it needs to comprise only gates that are native to particular device.qubit rewiring
is a process in which logical qubits in submitted circuits are mapped into concrete physical qubits on actual device.
Typically, one wants to design circuits using high-level concepts, without worrying about qubit placement or gate compilation. However, in some cases it might be desirable to have a precise control of the circuits being submitted.
We will start this how-to by defining a simple circuit comprising cnot(0, 1)
gate and running it on Aspem-M-3 device.
[2]:
provider = AWSBraketProvider()
aspen = provider.get_backend("Aspen-M-3")
circuit = QuantumCircuit(2)
circuit.cnot(0, 1)
result = aspen.run(circuit, shots=10)
plot_histogram(result.result().get_counts())
[2]:
Our circuit run perfectly fine. Since Rigetti devices don’t support CNOT gate natively, we deduce that the circuit was compiled by Braket and our CNOT gate got decomposed into some equivalent sequence of gates.
We will now see what happens if we pass verbatim=True
and disable_qubit_rewiring=True
options.
[3]:
aspen.run(circuit, verbatim=True, disable_qubit_rewiring=True)
---------------------------------------------------------------------------
ValidationException Traceback (most recent call last)
Input In [3], in <cell line: 1>()
----> 1 aspen.run(circuit, verbatim=True, disable_qubit_rewiring=True)
File ~/Projects/qiskit-braket-provider/qiskit_braket_provider/providers/braket_backend.py:251, in AWSBraketBackend.run(self, run_input, **options)
248 if options.pop("verbatim", False):
249 braket_circuits = wrap_circuits_in_verbatim_box(braket_circuits)
--> 251 batch_task: AwsQuantumTaskBatch = self._device.run_batch(
252 braket_circuits, **options
253 )
254 tasks: List[AwsQuantumTask] = batch_task.tasks
255 job_id = TASK_ID_DIVIDER.join(task.id for task in tasks)
File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_device.py:208, in AwsDevice.run_batch(self, task_specifications, s3_destination_folder, shots, max_parallel, max_connections, poll_timeout_seconds, poll_interval_seconds, *aws_quantum_task_args, **aws_quantum_task_kwargs)
165 def run_batch(
166 self,
167 task_specifications: List[Union[Circuit, Problem, OpenQasmProgram, BlackbirdProgram]],
(...)
175 **aws_quantum_task_kwargs,
176 ) -> AwsQuantumTaskBatch:
177 """Executes a batch of tasks in parallel
178
179 Args:
(...)
206 `braket.aws.aws_quantum_task_batch.AwsQuantumTaskBatch`
207 """
--> 208 return AwsQuantumTaskBatch(
209 AwsSession.copy_session(self._aws_session, max_connections=max_connections),
210 self._arn,
211 task_specifications,
212 s3_destination_folder
213 or (
214 AwsSession.parse_s3_uri(os.environ.get("AMZN_braket_job_RESULTS_S3_URI"))
215 if "AMZN_braket_job_RESULTS_S3_URI" in os.environ
216 else None
217 )
218 or (self._aws_session.default_bucket(), "tasks"),
219 shots if shots is not None else self._default_shots,
220 max_parallel=max_parallel if max_parallel is not None else self._default_max_parallel,
221 max_workers=max_connections,
222 poll_timeout_seconds=poll_timeout_seconds,
223 poll_interval_seconds=poll_interval_seconds,
224 *aws_quantum_task_args,
225 **aws_quantum_task_kwargs,
226 )
File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_quantum_task_batch.py:83, in AwsQuantumTaskBatch.__init__(self, aws_session, device_arn, task_specifications, s3_destination_folder, shots, max_parallel, max_workers, poll_timeout_seconds, poll_interval_seconds, *aws_quantum_task_args, **aws_quantum_task_kwargs)
42 def __init__(
43 self,
44 aws_session: AwsSession,
(...)
54 **aws_quantum_task_kwargs,
55 ):
56 """Creates a batch of quantum tasks.
57
58 Args:
(...)
81 `braket.aws.aws_quantum_task.AwsQuantumTask.create()`.
82 """
---> 83 self._tasks = AwsQuantumTaskBatch._execute(
84 aws_session,
85 device_arn,
86 task_specifications,
87 s3_destination_folder,
88 shots,
89 max_parallel,
90 max_workers,
91 poll_timeout_seconds,
92 poll_interval_seconds,
93 *aws_quantum_task_args,
94 **aws_quantum_task_kwargs,
95 )
96 self._aws_session = aws_session
97 self._results = None
File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_quantum_task_batch.py:151, in AwsQuantumTaskBatch._execute(aws_session, device_arn, task_specifications, s3_destination_folder, shots, max_parallel, max_workers, poll_timeout_seconds, poll_interval_seconds, *args, **kwargs)
134 with ThreadPoolExecutor(max_workers=max_threads) as executor:
135 task_futures = [
136 executor.submit(
137 AwsQuantumTaskBatch._create_task,
(...)
149 for task in task_specifications
150 ]
--> 151 tasks = [future.result() for future in task_futures]
152 return tasks
File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_quantum_task_batch.py:151, in <listcomp>(.0)
134 with ThreadPoolExecutor(max_workers=max_threads) as executor:
135 task_futures = [
136 executor.submit(
137 AwsQuantumTaskBatch._create_task,
(...)
149 for task in task_specifications
150 ]
--> 151 tasks = [future.result() for future in task_futures]
152 return tasks
File /usr/lib64/python3.9/concurrent/futures/_base.py:439, in Future.result(self, timeout)
437 raise CancelledError()
438 elif self._state == FINISHED:
--> 439 return self.__get_result()
441 self._condition.wait(timeout)
443 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
File /usr/lib64/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
389 if self._exception:
390 try:
--> 391 raise self._exception
392 finally:
393 # Break a reference cycle with the exception in self._exception
394 self = None
File /usr/lib64/python3.9/concurrent/futures/thread.py:58, in _WorkItem.run(self)
55 return
57 try:
---> 58 result = self.fn(*self.args, **self.kwargs)
59 except BaseException as exc:
60 self.future.set_exception(exc)
File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_quantum_task_batch.py:166, in AwsQuantumTaskBatch._create_task(remaining, aws_session, device_arn, task_specification, s3_destination_folder, shots, poll_interval_seconds, *args, **kwargs)
154 @staticmethod
155 def _create_task(
156 remaining,
(...)
164 **kwargs,
165 ):
--> 166 task = AwsQuantumTask.create(
167 aws_session,
168 device_arn,
169 task_specification,
170 s3_destination_folder,
171 shots,
172 poll_interval_seconds=poll_interval_seconds,
173 *args,
174 **kwargs,
175 )
177 remaining.pop()
179 # If the task hits a terminal state before all tasks have been created,
180 # it can be returned immediately
File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_quantum_task.py:147, in AwsQuantumTask.create(aws_session, device_arn, task_specification, s3_destination_folder, shots, device_parameters, disable_qubit_rewiring, tags, *args, **kwargs)
142 if isinstance(task_specification, Circuit) and task_specification.parameters:
143 raise ValueError(
144 f"Cannot execute circuit with unbound parameters: "
145 f"{task_specification.parameters}"
146 )
--> 147 return _create_internal(
148 task_specification,
149 aws_session,
150 create_task_kwargs,
151 device_arn,
152 device_parameters or {},
153 disable_qubit_rewiring,
154 *args,
155 **kwargs,
156 )
File /usr/lib64/python3.9/functools.py:888, in singledispatch.<locals>.wrapper(*args, **kw)
884 if not args:
885 raise TypeError(f'{funcname} requires at least '
886 '1 positional argument')
--> 888 return dispatch(args[0].__class__)(*args, **kw)
File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_quantum_task.py:496, in _(circuit, aws_session, create_task_kwargs, device_arn, device_parameters, disable_qubit_rewiring, *args, **kwargs)
489 device_parameters = GateModelSimulatorDeviceParameters(
490 paradigmParameters=paradigm_parameters
491 )
493 create_task_kwargs.update(
494 {"action": circuit.to_ir().json(), "deviceParameters": device_parameters.json()}
495 )
--> 496 task_arn = aws_session.create_quantum_task(**create_task_kwargs)
497 return AwsQuantumTask(task_arn, aws_session, *args, **kwargs)
File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/braket/aws/aws_session.py:180, in AwsSession.create_quantum_task(self, **boto3_kwargs)
178 if job_token:
179 boto3_kwargs.update({"jobToken": job_token})
--> 180 response = self.braket_client.create_quantum_task(**boto3_kwargs)
181 return response["quantumTaskArn"]
File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/botocore/client.py:508, in ClientCreator._create_api_method.<locals>._api_call(self, *args, **kwargs)
504 raise TypeError(
505 f"{py_operation_name}() only accepts keyword arguments."
506 )
507 # The "self" in this scope is referring to the BaseClient.
--> 508 return self._make_api_call(operation_name, kwargs)
File ~/.virtualenvs/pyqbench/lib/python3.9/site-packages/botocore/client.py:915, in BaseClient._make_api_call(self, operation_name, api_params)
913 error_code = parsed_response.get("Error", {}).get("Code")
914 error_class = self.exceptions.from_code(error_code)
--> 915 raise error_class(parsed_response, operation_name)
916 else:
917 return parsed_response
ValidationException: An error occurred (ValidationException) when calling the CreateQuantumTask operation: Backend ARN, arn:aws:braket:us-west-1::device/qpu/rigetti/Aspen-M-3, does not support the operation cnot as a native gate. Supported operations for this Backend ARN are ['rz', 'cz', 'xy', 'rx', 'cphaseshift'].
As expected, we obtained a ValidationException
. As a last step, let us verify that running circuit with native gates and matching topology is possible.
[ ]:
native_circuit = QuantumCircuit(2)
native_circuit.cz(0, 1)
result = aspen.run(native_circuit, verbatim=True, disable_qubit_rewiring=True).result()
plot_histogram(result.get_counts())