Source code for qiskit_cold_atom.providers.cold_atom_job
# This code is part of Qiskit.
#
# (C) Copyright IBM 2021.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Job for cold atom backends."""
from typing import Dict, Optional
import time
import json
import requests
from qiskit.providers import BackendV1 as Backend
from qiskit.providers import JobV1 as Job
from qiskit.providers import JobTimeoutError, JobError
from qiskit.providers import JobStatus
from qiskit.result import Result
[docs]class ColdAtomJob(Job):
"""Class of jobs returned by cold atom backends."""
def __init__(self, backend: Backend, job_id: str):
"""
Args:
backend: The backend on which the job was run.
job_id: The ID of the job.
"""
super().__init__(backend, job_id)
self.token = self._backend.token
self.user = self._backend.username
def _wait_for_result(self, timeout: float = None, wait: float = 5.0) -> Dict:
"""
Query the backend to get the result.
Args:
timeout: time after which the server is no longer queried for the result
wait: waiting time between queries for the result of the backend
Returns:
result dictionary formatted according to Qiskit schemas.
Raises:
JobTimeoutError: If the timeout is reached without receiving the result
JobError: If the returned status reports an error
"""
start_time = time.time()
while True:
elapsed = time.time() - start_time
if timeout and elapsed >= timeout:
raise JobTimeoutError("Timed out waiting for result")
result = requests.get(
self._backend.url + "/get_job_result",
params={
"job_id": self._job_id,
"username": self.user,
"token": self.token,
},
).json()
if result["status"] == "finished":
break
if result["status"] == "error":
raise JobError(f"{self._backend.name} returned error:\n" + str(result))
time.sleep(wait)
return result
# pylint: disable=arguments-differ
[docs] def result(self, timeout: Optional[float] = None, wait: float = 5.0) -> Result:
"""
Retrieve a qiskit result object from the backend.
Args:
timeout: time after which the server is no longer queried for the result
wait: waiting time between queries for the result of the backend
Returns:
qiskit.result object that contains the outcome of the job
"""
result_dict = self._wait_for_result(timeout, wait=wait)
return Result.from_dict(result_dict)
[docs] def status(self):
"""
Retrieve the status from the backend.
Returns:
status: A string describing the status of the job.
"""
r = requests.get(
self._backend.url + "/get_job_status",
params={
"job_id": self.job_id(),
"username": self.user,
"token": self.token,
},
)
status_string = r.json()["status"]
# If the backend can not be reached return ERROR as a status
if r.status_code != 200:
status = JobStatus.ERROR
elif status_string == "INITIALIZING":
status = JobStatus.INITIALIZING
elif status_string == "QUEUED":
status = JobStatus.QUEUED
elif status_string == "VALIDATING":
status = JobStatus.VALIDATING
elif status_string == "RUNNING":
status = JobStatus.RUNNING
elif status_string == "CANCELLED":
status = JobStatus.CANCELLED
elif status_string == "DONE":
status = JobStatus.DONE
else:
status = JobStatus.ERROR
return status
[docs] def error_message(self) -> Optional[str]:
"""
Retrieve the error message from the backend.
Returns:
error: A string describing the error that happened on the backend.
"""
status_payload = {"job_id": self.job_id()}
r = requests.get(
self._backend.url + "/get_job_status",
params={
"json": json.dumps(status_payload),
"username": self.user,
"token": self.token,
},
)
return r.json().get("error_message", None)
[docs] def cancel(self):
raise NotImplementedError
[docs] def submit(self):
raise NotImplementedError