Source code for qiskit_experiments.framework.experiment_data
# This code is part of Qiskit.## (C) Copyright IBM 2021.## This code is licensed under the Apache License, Version 2.0. You may# obtain a copy of this license in the LICENSE.txt file in the root directory# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.## Any modifications or derivative works of this code must retain this# copyright notice, and modified files need to carry a notice indicating# that they have been altered from the originals."""Experiment Data class"""from__future__importannotationsimportloggingimportdataclassesfromtypingimportDict,Optional,List,Union,Any,Callable,Tuple,TYPE_CHECKINGfromdatetimeimportdatetimefromconcurrentimportfuturesfromthreadingimportEventfromfunctoolsimportwrapsfromcollectionsimportdequeimportcontextlibimportcopyimportuuidimportenumimporttimeimportioimportsysimporttracebackimportnumpyasnpfrommatplotlibimportpyplotfrommatplotlib.figureimportFigureasMatplotlibFigurefromqiskit.resultimportResultfromqiskit.providers.jobstatusimportJobStatus,JOB_FINAL_STATESfromqiskit.exceptionsimportQiskitErrorfromqiskit.providersimportJob,Backend,Providerfromqiskit_ibm_experimentimportIBMExperimentServicefromqiskit_ibm_experimentimportExperimentDataasExperimentDataclassfromqiskit_experiments.framework.jsonimportExperimentEncoder,ExperimentDecoderfromqiskit_experiments.database_service.utilsimport(qiskit_version,plot_to_svg_bytes,ThreadSafeOrderedDict,ThreadSafeList,)fromqiskit_experiments.framework.analysis_resultimportAnalysisResultfromqiskit_experiments.frameworkimportBackendDatafromqiskit_experiments.database_service.exceptionsimport(ExperimentDataError,ExperimentEntryNotFound,ExperimentEntryExists,)ifTYPE_CHECKING:# There is a cyclical dependency here, but the name needs to exist for# Sphinx on Python 3.9+ to link type hints correctly. The gating on# `TYPE_CHECKING` means that the import will never be resolved by an actual# interpreter, only static analysis.from.importBaseExperimentLOG=logging.getLogger(__name__)defdo_auto_save(func:Callable):"""Decorate the input function to auto save data."""@wraps(func)def_wrapped(self,*args,**kwargs):return_val=func(self,*args,**kwargs)ifself.auto_save:self.save_metadata()returnreturn_valreturn_wrapped
[docs]classFigureData:"""Wrapper class for figures and figure metadata. The raw figure can be accessed with the ``figure`` attribute."""def__init__(self,figure,name=None,metadata=None):"""Creates a new figure data object. Args: figure: the raw figure itself. Can be SVG or matplotlib.Figure. name: Optional, the name of the figure. metadata: Optional, any metadata to be stored with the figure."""self.figure=figureself._name=nameself.metadata=metadataor{}# name is read only@propertydefname(self)->str:"""The name of the figure"""returnself._name@propertydefmetadata(self)->dict:"""The metadata dictionary stored with the figure"""returnself._metadata@metadata.setterdefmetadata(self,new_metadata:dict):"""Set the metadata to new value; must be a dictionary"""ifnotisinstance(new_metadata,dict):raiseValueError("figure metadata must be a dictionary")self._metadata=new_metadata
[docs]defcopy(self,new_name:Optional[str]=None):"""Creates a copy of the figure data"""name=new_nameorself.namereturnFigureData(figure=self.figure,name=name,metadata=copy.deepcopy(self.metadata))
def__json_encode__(self)->Dict[str,Any]:"""Return the json representation of the figure data"""return{"figure":self.figure,"name":self.name,"metadata":self.metadata}@classmethoddef__json_decode__(cls,args:Dict[str,Any])->"FigureData":"""Initialize a figure data from the json representation"""returncls(**args)def_repr_png_(self):ifisinstance(self.figure,MatplotlibFigure):b=io.BytesIO()self.figure.savefig(b,format="png",bbox_inches="tight")png=b.getvalue()returnpngelse:returnNonedef_repr_svg_(self):ifisinstance(self.figure,str):returnself.figureifisinstance(self.figure,bytes):returnstr(self.figure)returnNone
[docs]classExperimentData:"""Experiment data container class. This class handles the following: 1. Storing the data related to an experiment: raw data, metadata, analysis results, and figures 2. Managing jobs and adding data from jobs automatically 3. Saving and loading data from the database service | The field ``db_data`` is a dataclass (``ExperimentDataclass``) containing all the data that can be stored in the database and loaded from it, and as such is subject to strict conventions. Other data fields can be added and used freely, but they won't be saved to the database. """_metadata_version=1_job_executor=futures.ThreadPoolExecutor()_json_encoder=ExperimentEncoder_json_decoder=ExperimentDecoder_metadata_filename="metadata.json"def__init__(self,experiment:Optional["BaseExperiment"]=None,backend:Optional[Backend]=None,service:Optional[IBMExperimentService]=None,parent_id:Optional[str]=None,job_ids:Optional[List[str]]=None,child_data:Optional[List[ExperimentData]]=None,verbose:Optional[bool]=True,db_data:Optional[ExperimentDataclass]=None,**kwargs,):"""Initialize experiment data. Args: experiment: Experiment object that generated the data. backend: Backend the experiment runs on. This overrides the backend in the experiment object. service: The service that stores the experiment results to the database parent_id: ID of the parent experiment data in the setting of a composite experiment job_ids: IDs of jobs submitted for the experiment. child_data: List of child experiment data. verbose: Whether to print messages. db_data: A prepared ExperimentDataclass of the experiment info. This overrides other db parameters. """ifexperimentisnotNone:backend=backendorexperiment.backendexperiment_type=experiment.experiment_typeelse:experiment_type=Noneifjob_idsisNone:job_ids=[]self._experiment=experiment# data stored in the databasemetadata={}ifexperimentisnotNone:metadata=copy.deepcopy(experiment._metadata())source=metadata.pop("_source",{"class":f"{self.__class__.__module__}.{self.__class__.__name__}","metadata_version":self.__class__._metadata_version,"qiskit_version":qiskit_version(),},)metadata["_source"]=sourceexperiment_id=kwargs.get("experiment_id",str(uuid.uuid4()))ifdb_dataisNone:self._db_data=ExperimentDataclass(experiment_id=experiment_id,experiment_type=experiment_type,parent_id=parent_id,job_ids=job_ids,metadata=metadata,)else:self._db_data=db_dataforkey,valueinkwargs.items():ifhasattr(self._db_data,key):setattr(self._db_data,key,value)else:LOG.warning("Key '%s' not stored in the database",key)# general data relatedself._backend=NoneifbackendisnotNone:self._set_backend(backend,recursive=False)self._service=serviceifself._serviceisNoneandself.backendisnotNone:self._service=self.get_service_from_backend(self.backend)self._auto_save=Falseself._created_in_db=Falseself._extra_data=kwargsself.verbose=verbose# job handling relatedself._jobs=ThreadSafeOrderedDict(job_ids)self._job_futures=ThreadSafeOrderedDict()self._analysis_callbacks=ThreadSafeOrderedDict()self._analysis_futures=ThreadSafeOrderedDict()# Set 2 workers for analysis executor so there can be 1 actively running# future and one waiting "running" future. This is to allow the second# future to be cancelled without waiting for the actively running future# to finish first.self._analysis_executor=futures.ThreadPoolExecutor(max_workers=2)self._monitor_executor=futures.ThreadPoolExecutor()# data storageself._result_data=ThreadSafeList()self._figures=ThreadSafeOrderedDict(self._db_data.figure_names)self._analysis_results=ThreadSafeOrderedDict()self._deleted_figures=deque()self._deleted_analysis_results=deque()# Child related# Add component data and set parent ID to current containerself._child_data=ThreadSafeOrderedDict()ifchild_dataisnotNone:self._set_child_data(child_data)# Getters/setters for experiment metadata@propertydefexperiment(self):"""Return the experiment for this data. Returns: BaseExperiment: the experiment object. """returnself._experiment@propertydefcompletion_times(self)->Dict[str,datetime]:"""Returns the completion times of the jobs."""job_times={}forjob_id,jobinself._jobs.items():ifjobisnotNoneand"COMPLETED"injob.time_per_step():job_times[job_id]=job.time_per_step().get("COMPLETED")returnjob_times@propertydeftags(self)->List[str]:"""Return tags assigned to this experiment data. Returns: A list of tags assigned to this experiment data. """returnself._db_data.tags@tags.setterdeftags(self,new_tags:List[str])->None:"""Set tags for this experiment."""ifnotisinstance(new_tags,list):raiseExperimentDataError(f"The `tags` field of {type(self).__name__} must be a list.")self._db_data.tags=np.unique(new_tags).tolist()ifself.auto_save:self.save_metadata()@propertydefmetadata(self)->Dict:"""Return experiment metadata. Returns: Experiment metadata. """returnself._db_data.metadata@propertydefcreation_datetime(self)->"datetime":"""Return the creation datetime of this experiment data. Returns: The creation datetime of this experiment data. """returnself._db_data.creation_datetime@propertydefstart_datetime(self)->"datetime":"""Return the start datetime of this experiment data. Returns: The start datetime of this experiment data. """returnself._db_data.start_datetime@propertydefupdated_datetime(self)->"datetime":"""Return the update datetime of this experiment data. Returns: The update datetime of this experiment data. """returnself._db_data.updated_datetime@propertydefend_datetime(self)->"datetime":"""Return the end datetime of this experiment data. Returns: The end datetime of this experiment data. """returnself._db_data.end_datetime@propertydefhub(self)->str:"""Return the hub of this experiment data. Returns: The hub of this experiment data. """returnself._db_data.hub@propertydefgroup(self)->str:"""Return the group of this experiment data. Returns: The group of this experiment data. """returnself._db_data.group@propertydefproject(self)->str:"""Return the project of this experiment data. Returns: The project of this experiment data. """returnself._db_data.project@propertydef_provider(self)->Optional[Provider]:"""Return the provider. Returns: Provider used for the experiment, or ``None`` if unknown. """ifself._backendisNone:returnNonereturnself._backend.provider()@propertydefexperiment_id(self)->str:"""Return experiment ID Returns: Experiment ID. """returnself._db_data.experiment_id@propertydefexperiment_type(self)->str:"""Return experiment type Returns: Experiment type. """returnself._db_data.experiment_type@experiment_type.setterdefexperiment_type(self,new_type:str)->None:"""Sets the parent id"""self._db_data.experiment_type=new_type@propertydefparent_id(self)->str:"""Return parent experiment ID Returns: Parent ID. """returnself._db_data.parent_id@parent_id.setterdefparent_id(self,new_id:str)->None:"""Sets the parent id"""self._db_data.parent_id=new_id@propertydefjob_ids(self)->List[str]:"""Return experiment job IDs. Returns: IDs of jobs submitted for this experiment. """returnself._db_data.job_ids@propertydeffigure_names(self)->List[str]:"""Return names of the figures associated with this experiment. Returns: Names of figures associated with this experiment. """returnself._db_data.figure_names@propertydefshare_level(self)->str:"""Return the share level for this experiment Returns: Experiment share level. """returnself._db_data.share_level@share_level.setterdefshare_level(self,new_level:str)->None:"""Set the experiment share level, to this experiment itself and its descendants. Args: new_level: New experiment share level. Valid share levels are provider- specified. For example, IBM Quantum experiment service allows "public", "hub", "group", "project", and "private". """self._db_data.share_level=new_levelfordatainself._child_data.values():original_auto_save=data.auto_savedata.auto_save=Falsedata.share_level=new_leveldata.auto_save=original_auto_saveifself.auto_save:self.save_metadata()@propertydefnotes(self)->str:"""Return experiment notes. Returns: Experiment notes. """returnself._db_data.notes@notes.setterdefnotes(self,new_notes:str)->None:"""Update experiment notes. Args: new_notes: New experiment notes. """self._db_data.notes=new_notesifself.auto_save:self.save_metadata()@propertydefbackend_name(self)->str:"""Return the backend's name"""returnself._db_data.backend@propertydefbackend(self)->Backend:"""Return backend. Returns: Backend. """returnself._backend@backend.setterdefbackend(self,new_backend:Backend)->None:"""Update backend. Args: new_backend: New backend. """self._set_backend(new_backend)ifself.auto_save:self.save_metadata()def_set_backend(self,new_backend:Backend,recursive:bool=True)->None:"""Set backend. Args: new_backend: New backend. recursive: should set the backend for children as well """# defined independently from the setter to enable setting without autosaveself._backend=new_backendself._backend_data=BackendData(new_backend)self._db_data.backend=self._backend_data.nameifself._db_data.backendisNone:self._db_data.backend=str(new_backend)provider=self._backend_data.providerifproviderisnotNone:self._set_hgp_from_provider(provider)ifrecursive:fordatainself.child_data():data._set_backend(new_backend)def_set_hgp_from_provider(self,provider):try:hub=Nonegroup=Noneproject=None# qiskit-ibmq-provider styleifhasattr(provider,"credentials"):creds=provider.credentialshub=creds.hubgroup=creds.groupproject=creds.project# qiskit-ibm-provider styleifhasattr(provider,"_hgps"):hub,group,project=list(self.backend.provider._hgps.keys())[0].split("/")self._db_data.hub=self._db_data.huborhubself._db_data.group=self._db_data.grouporgroupself._db_data.project=self._db_data.projectorprojectexcept(AttributeError,IndexError):returndef_clear_results(self):"""Delete all currently stored analysis results and figures"""# Schedule existing analysis results for deletion next save callforkeyinself._analysis_results.keys():self._deleted_analysis_results.append(key)self._analysis_results=ThreadSafeOrderedDict()# Schedule existing figures for deletion next save callforkeyinself._figures.keys():self._deleted_figures.append(key)self._figures=ThreadSafeOrderedDict()@propertydefservice(self)->Optional[IBMExperimentService]:"""Return the database service. Returns: Service that can be used to access this experiment in a database. """returnself._service@service.setterdefservice(self,service:IBMExperimentService)->None:"""Set the service to be used for storing experiment data Args: service: Service to be used. Raises: ExperimentDataError: If an experiment service is already being used. """self._set_service(service)@propertydefauto_save(self)->bool:"""Return current auto-save option. Returns: Whether changes will be automatically saved. """returnself._auto_save@auto_save.setterdefauto_save(self,save_val:bool)->None:"""Set auto save preference. Args: save_val: Whether to do auto-save. """ifsave_valisTrueandnotself._auto_save:self.save()self._auto_save=save_valforresinself._analysis_results.values():# Setting private variable directly to avoid duplicate save. This# can be removed when we start tracking changes.res._auto_save=save_valfordatainself.child_data():data.auto_save=save_val@propertydefsource(self)->Dict:"""Return the class name and version."""returnself._db_data.metadata["_source"]# Data addition and deletion
[docs]defadd_data(self,data:Union[Result,List[Result],Job,List[Job],Dict,List[Dict]],)->None:"""Add experiment data. Args: data: Experiment data to add. Several types are accepted for convenience: * Result: Add data from this ``Result`` object. * List[Result]: Add data from the ``Result`` objects. * Dict: Add this data. * List[Dict]: Add this list of data. * Job: (Deprecated) Add data from the job result. * List[Job]: (Deprecated) Add data from the job results. Raises: TypeError: If the input data type is invalid. """ifany(notfuture.done()forfutureinself._analysis_futures.values()):LOG.warning("Not all analysis has finished running. Adding new data may ""create unexpected analysis results.")ifnotisinstance(data,list):data=[data]# Directly add non-job datawithself._result_data.lock:fordatumindata:ifisinstance(datum,dict):self._result_data.append(datum)elifisinstance(datum,Result):self._add_result_data(datum)else:raiseTypeError(f"Invalid data type {type(datum)}.")
[docs]defadd_jobs(self,jobs:Union[Job,List[Job]],timeout:Optional[float]=None,)->None:"""Add experiment data. Args: jobs: The Job or list of Jobs to add result data from. timeout: Optional, time in seconds to wait for all jobs to finish before cancelling them. Raises: TypeError: If the input data type is invalid. .. note:: If a timeout is specified the :meth:`cancel_jobs` method will be called after timing out to attempt to cancel any unfinished jobs. If you want to wait for jobs without cancelling, use the timeout kwarg of :meth:`block_for_results` instead. """ifany(notfuture.done()forfutureinself._analysis_futures.values()):LOG.warning("Not all analysis has finished running. Adding new jobs may ""create unexpected analysis results.")ifisinstance(jobs,Job):jobs=[jobs]# Add futures for extracting finished job datatimeout_ids=[]forjobinjobs:ifself.backendisnotNone:backend_name=BackendData(self.backend).namejob_backend_name=BackendData(job.backend()).nameifself.backendandbackend_name!=job_backend_name:LOG.warning("Adding a job from a backend (%s) that is different ""than the current backend (%s). ""The new backend will be used, but ""service is not changed if one already exists.",job.backend(),self.backend,)self.backend=job.backend()jid=job.job_id()ifjidinself._jobs:LOG.warning("Skipping duplicate job, a job with this ID already exists [Job ID: %s]",jid)else:self.job_ids.append(jid)self._jobs[jid]=jobifjidinself._job_futures:LOG.warning("Job future has already been submitted [Job ID: %s]",jid)else:self._add_job_future(job)iftimeoutisnotNone:timeout_ids.append(jid)# Add future for cancelling jobs that timeoutiftimeout_ids:self._job_executor.submit(self._timeout_running_jobs,timeout_ids,timeout)ifself.auto_save:self.save_metadata()
def_timeout_running_jobs(self,job_ids,timeout):"""Function for cancelling jobs after timeout length. This function should be submitted to an executor to run as a future. Args: job_ids: the IDs of jobs to wait for. timeout: The total time to wait for all jobs before cancelling. """futs=[self._job_futures[jid]forjidinjob_ids]waited=futures.wait(futs,timeout=timeout)# Try to cancel timed-out jobsifwaited.not_done:LOG.debug("Cancelling running jobs that exceeded add_jobs timeout.")done_ids={fut.result()[0]forfutinwaited.done}notdone_ids=[jidforjidinjob_idsifjidnotindone_ids]self.cancel_jobs(notdone_ids)def_add_job_future(self,job):"""Submit new _add_job_data job to executor"""jid=job.job_id()ifjidinself._job_futures:LOG.warning("Job future has already been submitted [Job ID: %s]",jid)else:self._job_futures[jid]=self._job_executor.submit(self._add_job_data,job)def_add_job_data(self,job:Job,)->Tuple[str,bool]:"""Wait for a job to finish and add job result data. Args: job: the Job to wait for and add data from. Returns: A tuple (str, bool) of the job id and bool of if the job data was added. Raises: Exception: If an error occured when adding job data. """jid=job.job_id()try:job_result=job.result()self._add_result_data(job_result)LOG.debug("Job data added [Job ID: %s]",jid)returnjid,TrueexceptExceptionasex:# pylint: disable=broad-except# Handle cancelled jobsstatus=job.status()ifstatus==JobStatus.CANCELLED:LOG.warning("Job was cancelled before completion [Job ID: %s]",jid)returnjid,Falseifstatus==JobStatus.ERROR:LOG.error("Job data not added for errorred job [Job ID: %s]\nError message: %s",jid,job.error_message(),)returnjid,FalseLOG.warning("Adding data from job failed [Job ID: %s]",job.job_id())raiseex
[docs]defadd_analysis_callback(self,callback:Callable,**kwargs:Any):"""Add analysis callback for running after experiment data jobs are finished. This method adds the `callback` function to a queue to be run asynchronously after completion of any running jobs, or immediately if no running jobs. If this method is called multiple times the callback functions will be executed in the order they were added. Args: callback: Callback function invoked when job finishes successfully. The callback function will be called as ``callback(expdata, **kwargs)`` where `expdata` is this ``DbExperimentData`` object, and `kwargs` are any additional keywork arguments passed to this method. **kwargs: Keyword arguments to be passed to the callback function. """withself._job_futures.lockandself._analysis_futures.lock:# Create callback dataclasscid=uuid.uuid4().hexself._analysis_callbacks[cid]=AnalysisCallback(name=callback.__name__,callback_id=cid,)# Futures to wait forfuts=self._job_futures.values()+self._analysis_futures.values()wait_future=self._monitor_executor.submit(self._wait_for_futures,futs,name="jobs and analysis")# Create a future to monitor event for calls to cancel_analysisdef_monitor_cancel():self._analysis_callbacks[cid].event.wait()returnFalsecancel_future=self._monitor_executor.submit(_monitor_cancel)# Add run analysis futureself._analysis_futures[cid]=self._analysis_executor.submit(self._run_analysis_callback,cid,wait_future,cancel_future,callback,**kwargs)
def_run_analysis_callback(self,callback_id:str,wait_future:futures.Future,cancel_future:futures.Future,callback:Callable,**kwargs,):"""Run an analysis callback after specified futures have finished."""ifcallback_idnotinself._analysis_callbacks:raiseValueError(f"No analysis callback with id {callback_id}")# Monitor jobs and cancellation event to see if callback should be run# or cancelled# Future which returns if either all jobs finish, or cancel event is setwaited=futures.wait([wait_future,cancel_future],return_when="FIRST_COMPLETED")cancel=notall(fut.result()forfutinwaited.done)# Ensure monitor event is set so monitor future can terminateself._analysis_callbacks[callback_id].event.set()# If not ready cancel the callback before runningifcancel:self._analysis_callbacks[callback_id].status=AnalysisStatus.CANCELLEDLOG.info("Cancelled analysis callback [Experiment ID: %s][Analysis Callback ID: %s]",self.experiment_id,callback_id,)returncallback_id,False# Run callback functionself._analysis_callbacks[callback_id].status=AnalysisStatus.RUNNINGtry:LOG.debug("Running analysis callback '%s' [Experiment ID: %s][Analysis Callback ID: %s]",self._analysis_callbacks[callback_id].name,self.experiment_id,callback_id,)callback(self,**kwargs)self._analysis_callbacks[callback_id].status=AnalysisStatus.DONELOG.debug("Analysis callback finished [Experiment ID: %s][Analysis Callback ID: %s]",self.experiment_id,callback_id,)returncallback_id,TrueexceptExceptionasex:# pylint: disable=broad-exceptself._analysis_callbacks[callback_id].status=AnalysisStatus.ERRORtb_text="".join(traceback.format_exception(type(ex),ex,ex.__traceback__))error_msg=(f"Analysis callback failed [Experiment ID: {self.experiment_id}]"f"[Analysis Callback ID: {callback_id}]:\n{tb_text}")self._analysis_callbacks[callback_id].error_msg=error_msgLOG.warning(error_msg)returncallback_id,Falsedef_add_result_data(self,result:Result)->None:"""Add data from a Result object Args: result: Result object containing data to be added. """ifresult.job_idnotinself._jobs:self._jobs[result.job_id]=Noneself.job_ids.append(result.job_id)withself._result_data.lock:# Lock data while adding all result datafori,_inenumerate(result.results):data=result.data(i)data["job_id"]=result.job_idif"counts"indata:# Format to Counts object rather than hex dictdata["counts"]=result.get_counts(i)expr_result=result.results[i]ifhasattr(expr_result,"header")andhasattr(expr_result.header,"metadata"):data["metadata"]=expr_result.header.metadatadata["shots"]=expr_result.shotsdata["meas_level"]=expr_result.meas_levelifhasattr(expr_result,"meas_return"):data["meas_return"]=expr_result.meas_returnself._result_data.append(data)def_retrieve_data(self):"""Retrieve job data if missing experiment data."""ifself._result_dataornotself._backend:return# Get job results if missing experiment data.retrieved_jobs={}forjid,jobinself._jobs.items():ifjobisNone:try:LOG.debug("Retrieving job from backend %s [Job ID: %s]",self._backend,jid)job=self._backend.retrieve_job(jid)retrieved_jobs[jid]=jobexceptException:# pylint: disable=broad-exceptLOG.warning("Unable to retrieve data from job on backend %s [Job ID: %s]",self._backend,jid,)# Add retrieved job objects to stored jobs and extract dataforjid,jobinretrieved_jobs.items():self._jobs[jid]=jobifjob.status()inJOB_FINAL_STATES:# Add job results synchronouslyself._add_job_data(job)else:# Add job results asynchronouslyself._add_job_future(job)
[docs]defdata(self,index:Optional[Union[int,slice,str]]=None,)->Union[Dict,List[Dict]]:"""Return the experiment data at the specified index. Args: index: Index of the data to be returned. Several types are accepted for convenience: * None: Return all experiment data. * int: Specific index of the data. * slice: A list slice of data indexes. * str: ID of the job that produced the data. Returns: Experiment data. Raises: TypeError: If the input `index` has an invalid type. """self._retrieve_data()ifindexisNone:returnself._result_data.copy()ifisinstance(index,(int,slice)):returnself._result_data[index]ifisinstance(index,str):return[datafordatainself._result_dataifdata.get("job_id")==index]raiseTypeError(f"Invalid index type {type(index)}.")
[docs]@do_auto_savedefadd_figures(self,figures,figure_names=None,overwrite=False,save_figure=None,)->Union[str,List[str]]:"""Add the experiment figure. Args: figures (str or bytes or pyplot.Figure or list): Paths of the figure files or figure data. figure_names (str or list): Names of the figures. If ``None``, use the figure file names, if given, or a generated name. If `figures` is a list, then `figure_names` must also be a list of the same length or ``None``. overwrite (bool): Whether to overwrite the figure if one already exists with the same name. save_figure (bool): Whether to save the figure in the database. If ``None``, the ``auto-save`` attribute is used. Returns: str or list: Figure names. Raises: ExperimentEntryExists: If the figure with the same name already exists, and `overwrite=True` is not specified. ValueError: If an input parameter has an invalid value. """iffigure_namesisnotNoneandnotisinstance(figure_names,list):figure_names=[figure_names]ifnotisinstance(figures,list):figures=[figures]iffigure_namesisnotNoneandlen(figures)!=len(figure_names):raiseValueError("The parameter figure_names must be None or a list of ""the same size as the parameter figures.")added_figs=[]foridx,figureinenumerate(figures):iffigure_namesisNone:ifisinstance(figure,str):fig_name=figureelse:fig_name=(f"{self.experiment_type}_"f"Fig-{len(self._figures)}_"f"Exp-{self.experiment_id[:8]}.svg")else:fig_name=figure_names[idx]ifnotfig_name.endswith(".svg"):LOG.info("File name %s does not have an SVG extension. A '.svg' is added.")fig_name+=".svg"existing_figure=fig_nameinself._figuresifexisting_figureandnotoverwrite:raiseExperimentEntryExists(f"A figure with the name {fig_name} for this experiment "f"already exists. Specify overwrite=True if you "f"want to overwrite it.")# figure_data = Noneifisinstance(figure,str):withopen(figure,"rb")asfile:figure=file.read()# check whether the figure is already wrapped, meaning it came from a sub-experimentifisinstance(figure,FigureData):figure_data=figure.copy(new_name=fig_name)else:figure_metadata={"qubits":self.metadata.get("physical_qubits")}figure_data=FigureData(figure=figure,name=fig_name,metadata=figure_metadata)self._figures[fig_name]=figure_dataself._db_data.figure_names.append(fig_name)save=save_figureifsave_figureisnotNoneelseself.auto_saveifsaveandself._service:ifisinstance(figure,pyplot.Figure):figure=plot_to_svg_bytes(figure)self._service.create_or_update_figure(experiment_id=self.experiment_id,figure=figure,figure_name=fig_name,create=notexisting_figure,)added_figs.append(fig_name)returnadded_figsiflen(added_figs)!=1elseadded_figs[0]
[docs]@do_auto_savedefdelete_figure(self,figure_key:Union[str,int],)->str:"""Add the experiment figure. Args: figure_key: Name or index of the figure. Returns: Figure name. Raises: ExperimentEntryNotFound: If the figure is not found. """ifisinstance(figure_key,int):figure_key=self._figures.keys()[figure_key]eliffigure_keynotinself._figures:raiseExperimentEntryNotFound(f"Figure {figure_key} not found.")delself._figures[figure_key]self._deleted_figures.append(figure_key)ifself._serviceandself.auto_save:withservice_exception_to_warning():self.service.delete_figure(experiment_id=self.experiment_id,figure_name=figure_key)self._deleted_figures.remove(figure_key)returnfigure_key
[docs]deffigure(self,figure_key:Union[str,int],file_name:Optional[str]=None,)->Union[int,FigureData]:"""Retrieve the specified experiment figure. Args: figure_key: Name or index of the figure. file_name: Name of the local file to save the figure to. If ``None``, the content of the figure is returned instead. Returns: The size of the figure if `file_name` is specified. Otherwise the content of the figure as a `FigureData` object. Raises: ExperimentEntryNotFound: If the figure cannot be found. """ifisinstance(figure_key,int):iffigure_key<0orfigure_key>=len(self._figures.keys()):raiseExperimentEntryNotFound(f"Figure {figure_key} not found.")figure_key=self._figures.keys()[figure_key]figure_data=self._figures.get(figure_key,None)iffigure_dataisNoneandself.service:figure=self.service.figure(experiment_id=self.experiment_id,figure_name=figure_key)figure_data=FigureData(figure=figure,name=figure_key)self._figures[figure_key]=figure_dataiffigure_dataisNone:raiseExperimentEntryNotFound(f"Figure {figure_key} not found.")iffile_name:withopen(file_name,"wb")asoutput:num_bytes=output.write(figure_data.figure)returnnum_bytesreturnfigure_data
[docs]@do_auto_savedefadd_analysis_results(self,results:Union[AnalysisResult,List[AnalysisResult]],)->None:"""Save the analysis result. Args: results: Analysis results to be saved. """ifnotisinstance(results,list):results=[results]forresultinresults:self._analysis_results[result.result_id]=resultwithcontextlib.suppress(ExperimentDataError):result.service=self.serviceresult.auto_save=self.auto_saveifself.auto_saveandself._service:result.save()
[docs]@do_auto_savedefdelete_analysis_result(self,result_key:Union[int,str],)->str:"""Delete the analysis result. Args: result_key: ID or index of the analysis result to be deleted. Returns: Analysis result ID. Raises: ExperimentEntryNotFound: If analysis result not found. """ifisinstance(result_key,int):result_key=self._analysis_results.keys()[result_key]else:# Retrieve from DB if needed.result_key=self.analysis_results(result_key,block=False).result_iddelself._analysis_results[result_key]self._deleted_analysis_results.append(result_key)ifself._serviceandself.auto_save:withservice_exception_to_warning():self.service.delete_analysis_result(result_id=result_key)self._deleted_analysis_results.remove(result_key)returnresult_key
def_retrieve_analysis_results(self,refresh:bool=False):"""Retrieve service analysis results. Args: refresh: Retrieve the latest analysis results from the server, if an experiment service is available. """# Get job results if missing experiment data.ifself.serviceand(notself._analysis_resultsorrefresh):retrieved_results=self.service.analysis_results(experiment_id=self.experiment_id,limit=None,json_decoder=self._json_decoder)forresultinretrieved_results:result_id=result.result_idself._analysis_results[result_id]=AnalysisResult(service=self.service)self._analysis_results[result_id].set_data(result)self._analysis_results[result_id]._created_in_db=True
[docs]defanalysis_results(self,index:Optional[Union[int,slice,str]]=None,refresh:bool=False,block:bool=True,timeout:Optional[float]=None,)->Union[AnalysisResult,List[AnalysisResult]]:"""Return analysis results associated with this experiment. Args: index: Index of the analysis result to be returned. Several types are accepted for convenience: * None: Return all analysis results. * int: Specific index of the analysis results. * slice: A list slice of indexes. * str: ID or name of the analysis result. refresh: Retrieve the latest analysis results from the server, if an experiment service is available. block: If True block for any analysis callbacks to finish running. timeout: max time in seconds to wait for analysis callbacks to finish running. Returns: Analysis results for this experiment. Raises: TypeError: If the input `index` has an invalid type. ExperimentEntryNotFound: If the entry cannot be found. """ifblock:self._wait_for_futures(self._analysis_futures.values(),name="analysis",timeout=timeout)self._retrieve_analysis_results(refresh=refresh)ifindexisNone:returnself._analysis_results.values()def_make_not_found_message(index:Union[int,slice,str])->str:"""Helper to make error message for index not found"""msg=[f"Analysis result {index} not found."]errors=self.errors()iferrors:msg.append(f"Errors: {errors}")return"\n".join(msg)ifisinstance(index,int):ifindex>=len(self._analysis_results.values()):raiseExperimentEntryNotFound(_make_not_found_message(index))returnself._analysis_results.values()[index]ifisinstance(index,slice):results=self._analysis_results.values()[index]ifnotresults:raiseExperimentEntryNotFound(_make_not_found_message(index))returnresultsifisinstance(index,str):# Check by result IDifindexinself._analysis_results:returnself._analysis_results[index]# Check by namefiltered=[resultforresultinself._analysis_results.values()ifresult.name==index]ifnotfiltered:raiseExperimentEntryNotFound(_make_not_found_message(index))iflen(filtered)==1:returnfiltered[0]else:returnfilteredraiseTypeError(f"Invalid index type {type(index)}.")
# Save and load from the database
[docs]defsave_metadata(self)->None:"""Save this experiments metadata to a database service. .. note:: This method does not save analysis results nor figures. Use :meth:`save` for general saving of all experiment data. See :meth:`qiskit.providers.experiment.IBMExperimentService.create_experiment` for fields that are saved. """self._save_experiment_metadata()fordatainself.child_data():data.save_metadata()
def_save_experiment_metadata(self,suppress_errors:bool=True)->None:"""Save this experiments metadata to a database service. Args: suppress_errors: should the method catch exceptions (true) or pass them on, potentially aborting the experiemnt (false) Raises: QiskitError: If the save to the database failed .. note:: This method does not save analysis results nor figures. Use :meth:`save` for general saving of all experiment data. See :meth:`qiskit.providers.experiment.IBMExperimentService.create_experiment` for fields that are saved. """ifnotself._service:LOG.warning("Experiment cannot be saved because no experiment service is available. ""An experiment service is available, for example, ""when using an IBM Quantum backend.")returntry:handle_metadata_separately=self._metadata_too_large()ifhandle_metadata_separately:metadata=self._db_data.metadataself._db_data.metadata={}self.service.create_or_update_experiment(self._db_data,json_encoder=self._json_encoder,create=notself._created_in_db)self._created_in_db=Trueifhandle_metadata_separately:self.service.file_upload(self._db_data.experiment_id,self._metadata_filename,metadata)self._db_data.metadata=metadataexceptExceptionasex:# pylint: disable=broad-except# Don't automatically fail the experiment just because its data cannot be saved.LOG.error("Unable to save the experiment data: %s",traceback.format_exc())ifnotsuppress_errors:raiseQiskitError(f"Experiment data save failed\nError Message:\n{str(ex)}")fromexdef_metadata_too_large(self):"""Determines whether the metadata should be stored in a separate file"""# currently the entire POST JSON request body is limited by default to 100kbreturnsys.getsizeof(self.metadata)>10000
[docs]defsave(self,suppress_errors:bool=True)->None:"""Save the experiment data to a database service. Args: suppress_errors: should the method catch exceptions (true) or pass them on, potentially aborting the experiemnt (false) .. note:: This saves the experiment metadata, all analysis results, and all figures. Depending on the number of figures and analysis results this operation could take a while. To only update a previously saved experiments metadata (eg for additional tags or notes) use :meth:`save_metadata`. """# TODO - track changesifnotself._service:LOG.warning("Experiment cannot be saved because no experiment service is available. ""An experiment service is available, for example, ""when using an IBM Quantum backend.")returnself._save_experiment_metadata(suppress_errors=suppress_errors)ifnotself._created_in_db:LOG.warning("Could not save experiment metadata to DB, aborting experiment save")returnforresultinself._analysis_results.values():result.save(suppress_errors=suppress_errors)forresultinself._deleted_analysis_results.copy():withservice_exception_to_warning():self._service.delete_analysis_result(result_id=result)self._deleted_analysis_results.remove(result)withself._figures.lock:forname,figureinself._figures.items():iffigureisNone:continue# currently only the figure and its name are stored in the databaseifisinstance(figure,FigureData):figure=figure.figureLOG.debug("Figure metadata is currently not saved to the database")ifisinstance(figure,pyplot.Figure):figure=plot_to_svg_bytes(figure)self._service.create_or_update_figure(experiment_id=self.experiment_id,figure=figure,figure_name=name)fornameinself._deleted_figures.copy():withservice_exception_to_warning():self._service.delete_figure(experiment_id=self.experiment_id,figure_name=name)self._deleted_figures.remove(name)ifnotself.service.localandself.verbose:print("You can view the experiment online at "f"https://quantum-computing.ibm.com/experiments/{self.experiment_id}")# handle children, but without additional printsfordatainself._child_data.values():original_verbose=data.verbosedata.verbose=Falsedata.save()data.verbose=original_verbose
[docs]defjobs(self)->List[Job]:"""Return a list of jobs for the experiment"""returnself._jobs.values()
[docs]defcancel_jobs(self,ids:Optional[Union[str,List[str]]]=None)->bool:"""Cancel any running jobs. Args: ids: Job(s) to cancel. If None all non-finished jobs will be cancelled. Returns: True if the specified jobs were successfully cancelled otherwise false. """ifisinstance(ids,str):ids=[ids]withself._jobs.lock:all_cancelled=Trueforjid,jobinreversed(self._jobs.items()):ifidsandjidnotinids:# Skip cancelling this callbackcontinueifjobandjob.status()notinJOB_FINAL_STATES:try:job.cancel()LOG.warning("Cancelled job [Job ID: %s]",jid)exceptExceptionaserr:# pylint: disable=broad-exceptall_cancelled=FalseLOG.warning("Unable to cancel job [Job ID: %s]:\n%s",jid,err)continue# Remove done or cancelled job futuresifjidinself._job_futures:delself._job_futures[jid]returnall_cancelled
[docs]defcancel_analysis(self,ids:Optional[Union[str,List[str]]]=None)->bool:"""Cancel any queued analysis callbacks. .. note:: A currently running analysis callback cannot be cancelled. Args: ids: Analysis callback(s) to cancel. If None all non-finished analysis will be cancelled. Returns: True if the specified analysis callbacks were successfully cancelled otherwise false. """ifisinstance(ids,str):ids=[ids]# Lock analysis futures so we can't add more while trying to cancelwithself._analysis_futures.lock:all_cancelled=Truenot_running=[]forcid,callbackinreversed(self._analysis_callbacks.items()):ifidsandcidnotinids:# Skip cancelling this callbackcontinue# Set event to cancel callbackcallback.event.set()# Check for running callback that can't be cancelledifcallback.status==AnalysisStatus.RUNNING:all_cancelled=FalseLOG.warning("Unable to cancel running analysis callback [Experiment ID: %s]""[Analysis Callback ID: %s]",self.experiment_id,cid,)else:not_running.append(cid)# Wait for completion of other futures cancelled via event.setwaited=futures.wait([self._analysis_futures[cid]forcidinnot_running],timeout=1)# Get futures that didn't raise exceptionforfutinwaited.done:iffut.done()andnotfut.exception():cid=fut.result()[0]ifcidinself._analysis_futures:delself._analysis_futures[cid]returnall_cancelled
[docs]defcancel(self)->bool:"""Attempt to cancel any running jobs and queued analysis callbacks. .. note:: A running analysis callback cannot be cancelled. Returns: True if all jobs and analysis are successfully cancelled, otherwise false. """# Cancel analysis first since it is queued on jobs, then cancel jobs# otherwise there can be a race issue when analysis starts running# as soon as jobs are cancelledanalysis_cancelled=self.cancel_analysis()jobs_cancelled=self.cancel_jobs()returnanalysis_cancelledandjobs_cancelled
[docs]defblock_for_results(self,timeout:Optional[float]=None)->"ExperimentData":"""Block until all pending jobs and analysis callbacks finish. Args: timeout: Timeout in seconds for waiting for results. Returns: The experiment data with finished jobs and post-processing. """start_time=time.time()withself._job_futures.lockandself._analysis_futures.lock:# Lock threads to get all current job and analysis futures# at the time of function call and then release the lockjob_ids=self._job_futures.keys()job_futs=self._job_futures.values()analysis_ids=self._analysis_futures.keys()analysis_futs=self._analysis_futures.values()# Wait for futuresself._wait_for_futures(job_futs+analysis_futs,name="jobs and analysis",timeout=timeout)# Clean up done job futuresnum_jobs=len(job_ids)forjid,futinzip(job_ids,job_futs):if(fut.done()andnotfut.exception())orfut.cancelled():ifjidinself._job_futures:delself._job_futures[jid]num_jobs-=1# Clean up done analysis futuresnum_analysis=len(analysis_ids)forcid,futinzip(analysis_ids,analysis_futs):if(fut.done()andnotfut.exception())orfut.cancelled():ifcidinself._analysis_futures:delself._analysis_futures[cid]num_analysis-=1# Check if more futures got added while this function was running# and block recursively. This could happen if an analysis callback# spawns another callback or creates more jobsiflen(self._job_futures)>num_jobsorlen(self._analysis_futures)>num_analysis:time_taken=time.time()-start_timeiftimeoutisnotNone:timeout=max(0,timeout-time_taken)returnself.block_for_results(timeout=timeout)returnself
def_wait_for_futures(self,futs:List[futures.Future],name:str="futures",timeout:Optional[float]=None)->bool:"""Wait for jobs to finish running. Args: futs: Job or analysis futures to wait for. name: type name for future for logger messages. timeout: The length of time to wait for all jobs before returning False. Returns: True if all jobs finished. False if timeout time was reached or any jobs were cancelled or had an exception. """waited=futures.wait(futs,timeout=timeout)value=True# Log futures still running after timeoutifwaited.not_done:LOG.info("Waiting for %s timed out before completion [Experiment ID: %s].",name,self.experiment_id,)value=False# Check for futures that were cancelled or errorredexcepts=""forfutinwaited.done:ex=fut.exception()ifex:excepts+="\n".join(traceback.format_exception(type(ex),ex,ex.__traceback__))value=Falseeliffut.cancelled():LOG.debug("%s was cancelled before completion [Experiment ID: %s]",name,self.experiment_id,)value=Falseelifnotfut.result()[1]:# The job/analysis did not succeed, and the failure reflects in the second# returned value of _add_job_data/_run_analysis_callback. See details in Issue #866.value=Falseifexcepts:LOG.error("%s raised exceptions [Experiment ID: %s]:%s",name,self.experiment_id,excepts)returnvalue
[docs]defstatus(self)->ExperimentStatus:"""Return the experiment status. Possible return values for :class:`.ExperimentStatus` are * :attr:`~.ExperimentStatus.EMPTY` - experiment data is empty * :attr:`~.ExperimentStatus.INITIALIZING` - experiment jobs are being initialized * :attr:`~.ExperimentStatus.QUEUED` - experiment jobs are queued * :attr:`~.ExperimentStatus.RUNNING` - experiment jobs is actively running * :attr:`~.ExperimentStatus.CANCELLED` - experiment jobs or analysis has been cancelled * :attr:`~.ExperimentStatus.POST_PROCESSING` - experiment analysis is actively running * :attr:`~.ExperimentStatus.DONE` - experiment jobs and analysis have successfully run * :attr:`~.ExperimentStatus.ERROR` - experiment jobs or analysis incurred an error .. note:: If an experiment has status :attr:`~.ExperimentStatus.ERROR` there may still be pending or running jobs. In these cases it may be beneficial to call :meth:`cancel_jobs` to terminate these remaining jobs. Returns: The experiment status. """ifall(len(container)==0forcontainerin[self._result_data,self._jobs,self._job_futures,self._analysis_callbacks,self._analysis_futures,self._figures,self._analysis_results,]):returnExperimentStatus.EMPTY# Return job status is job is not DONEtry:return{JobStatus.INITIALIZING:ExperimentStatus.INITIALIZING,JobStatus.QUEUED:ExperimentStatus.QUEUED,JobStatus.VALIDATING:ExperimentStatus.VALIDATING,JobStatus.RUNNING:ExperimentStatus.RUNNING,JobStatus.CANCELLED:ExperimentStatus.CANCELLED,JobStatus.ERROR:ExperimentStatus.ERROR,}[self.job_status()]exceptKeyError:pass# Return analysis status if Done, cancelled or errortry:return{AnalysisStatus.DONE:ExperimentStatus.DONE,AnalysisStatus.CANCELLED:ExperimentStatus.CANCELLED,AnalysisStatus.ERROR:ExperimentStatus.ERROR,}[self.analysis_status()]exceptKeyError:returnExperimentStatus.POST_PROCESSING
[docs]defjob_status(self)->JobStatus:"""Return the experiment job execution status. Possible return values for :class:`qiskit.providers.jobstatus.JobStatus` are * ``ERROR`` - if any job incurred an error * ``CANCELLED`` - if any job is cancelled. * ``RUNNING`` - if any job is still running. * ``QUEUED`` - if any job is queued. * ``VALIDATING`` - if any job is being validated. * ``INITIALIZING`` - if any job is being initialized. * ``DONE`` - if all jobs are finished. .. note:: If an experiment has status ``ERROR`` or ``CANCELLED`` there may still be pending or running jobs. In these cases it may be beneficial to call :meth:`cancel_jobs` to terminate these remaining jobs. Returns: The job execution status. """statuses=set()withself._jobs.lock:# No jobs presentifnotself._jobs:returnJobStatus.DONEstatuses=set()forjobinself._jobs.values():ifjob:statuses.add(job.status())# If any jobs are in non-DONE state return that stateforstatin[JobStatus.ERROR,JobStatus.CANCELLED,JobStatus.RUNNING,JobStatus.QUEUED,JobStatus.VALIDATING,JobStatus.INITIALIZING,]:ifstatinstatuses:returnstatreturnJobStatus.DONE
[docs]defanalysis_status(self)->AnalysisStatus:"""Return the data analysis post-processing status. Possible return values for :class:`.AnalysisStatus` are * :attr:`~.AnalysisStatus.ERROR` - if any analysis callback incurred an error * :attr:`~.AnalysisStatus.CANCELLED` - if any analysis callback is cancelled. * :attr:`~.AnalysisStatus.RUNNING` - if any analysis callback is actively running. * :attr:`~.AnalysisStatus.QUEUED` - if any analysis callback is queued. * :attr:`~.AnalysisStatus.DONE` - if all analysis callbacks have successfully run. Returns: Then analysis status. """statuses=set()forstatusinself._analysis_callbacks.values():statuses.add(status.status)forstatin[AnalysisStatus.ERROR,AnalysisStatus.CANCELLED,AnalysisStatus.RUNNING,AnalysisStatus.QUEUED,]:ifstatinstatuses:returnstatreturnAnalysisStatus.DONE
[docs]defjob_errors(self)->str:"""Return any errors encountered in job execution."""errors=[]# Get any job errorsforjobinself._jobs.values():ifjobandjob.status()==JobStatus.ERROR:ifhasattr(job,"error_message"):error_msg=job.error_message()else:error_msg=""errors.append(f"\n[Job ID: {job.job_id()}]: {error_msg}")# Get any job futures errors:forjid,futinself._job_futures.items():iffutandfut.done()andfut.exception():ex=fut.exception()errors.append(f"[Job ID: {jid}]""\n".join(traceback.format_exception(type(ex),ex,ex.__traceback__)))return"".join(errors)
[docs]defanalysis_errors(self)->str:"""Return any errors encountered during analysis callbacks."""errors=[]# Get any callback errorsforcid,callbackinself._analysis_callbacks.items():ifcallback.status==AnalysisStatus.ERROR:errors.append(f"\n[Analysis Callback ID: {cid}]: {callback.error_msg}")return"".join(errors)
[docs]deferrors(self)->str:"""Return errors encountered during job and analysis execution. .. note:: To display only job or analysis errors use the :meth:`job_errors` or :meth:`analysis_errors` methods. Returns: Experiment errors. """returnself.job_errors()+self.analysis_errors()
# Children handling
[docs]defadd_child_data(self,experiment_data:ExperimentData):"""Add child experiment data to the current experiment data"""experiment_data.parent_id=self.experiment_idself._child_data[experiment_data.experiment_id]=experiment_dataself.metadata["child_data_ids"]=self._child_data.keys()
[docs]defchild_data(self,index:Optional[Union[int,slice,str]]=None)->Union[ExperimentData,List[ExperimentData]]:"""Return child experiment data. Args: index: Index of the child experiment data to be returned. Several types are accepted for convenience: * None: Return all child data. * int: Specific index of the child data. * slice: A list slice of indexes. * str: experiment ID of the child data. Returns: The requested single or list of child experiment data. Raises: QiskitError: If the index or ID of the child experiment data cannot be found. """ifindexisNone:returnself._child_data.values()ifisinstance(index,(int,slice)):returnself._child_data.values()[index]ifisinstance(index,str):returnself._child_data[index]raiseQiskitError(f"Invalid index type {type(index)}.")
[docs]@classmethoddefload(cls,experiment_id:str,service:IBMExperimentService)->"ExperimentData":"""Load a saved experiment data from a database service. Args: experiment_id: Experiment ID. service: the database service. Returns: The loaded experiment data. """data=service.experiment(experiment_id,json_decoder=cls._json_decoder)ifservice.experiment_has_file(experiment_id,cls._metadata_filename):metadata=service.file_download(experiment_id,cls._metadata_filename)data.metadata.update(metadata)expdata=cls(service=service,db_data=data)# Retrieve data and analysis results# Maybe this isn't necessary but the repr of the class should# be updated to show correct number of results including remote onesexpdata._retrieve_data()expdata._retrieve_analysis_results()# mark it as existing in the DBexpdata._created_in_db=Truechild_data_ids=expdata.metadata.pop("child_data_ids",[])child_data=[ExperimentData.load(child_id,service)forchild_idinchild_data_ids]expdata._set_child_data(child_data)returnexpdata
[docs]defcopy(self,copy_results:bool=True)->"ExperimentData":"""Make a copy of the experiment data with a new experiment ID. Args: copy_results: If True copy the analysis results and figures into the returned container, along with the experiment data and metadata. If False only copy the experiment data and metadata. Returns: A copy of the experiment data object with the same data but different IDs. .. note: If analysis results and figures are copied they will also have new result IDs and figure names generated for the copies. This method can not be called from an analysis callback. It waits for analysis callbacks to complete before copying analysis results. """new_instance=ExperimentData(backend=self.backend,service=self.service,parent_id=self.parent_id,job_ids=self.job_ids,child_data=list(self._child_data.values()),verbose=self.verbose,)new_instance._db_data=self._db_data.copy()new_instance._db_data.experiment_id=str(uuid.uuid4())# different id for copied experimentifself.experimentisNone:new_instance._experiment=Noneelse:new_instance._experiment=self.experiment.copy()LOG.debug("Copying experiment data [Experiment ID: %s]: %s",self.experiment_id,new_instance.experiment_id,)# Copy basic properties and metadatanew_instance._jobs=self._jobs.copy_object()new_instance._auto_save=self._auto_savenew_instance._extra_data=self._extra_data# Copy circuit result data and jobswithself._result_data.lock:# Hold the lock so no new data can be added.new_instance._result_data=self._result_data.copy_object()forjid,futinself._job_futures.items():ifnotfut.done():new_instance._add_job_future(new_instance._jobs[jid])# If not copying results return the objectifnotcopy_results:returnnew_instance# Copy results and figures.# This requires analysis callbacks to finishself._wait_for_futures(self._analysis_futures.values(),name="analysis")withself._analysis_results.lock:new_instance._analysis_results=ThreadSafeOrderedDict()new_instance.add_analysis_results([result.copy()forresultinself.analysis_results()])withself._figures.lock:new_instance._figures=ThreadSafeOrderedDict()new_instance.add_figures(self._figures.values())# Recursively copy child datachild_data=[data.copy(copy_results=copy_results)fordatainself.child_data()]new_instance._set_child_data(child_data)returnnew_instance
def_set_child_data(self,child_data:List[ExperimentData]):"""Set child experiment data for the current experiment."""self._child_data=ThreadSafeOrderedDict()fordatainchild_data:self.add_child_data(data)self._db_data.metadata["child_data_ids"]=self._child_data.keys()def_set_service(self,service:IBMExperimentService,replace:bool=None)->None:"""Set the service to be used for storing experiment data, to this experiment itself and its descendants. Args: service: Service to be used. replace: Should an existing service be replaced? If not, and a current service exists, exception is raised Raises: ExperimentDataError: If an experiment service is already being used and `replace==False`. """ifself._serviceandnotreplace:raiseExperimentDataError("An experiment service is already being used.")self._service=serviceforresultinself._analysis_results.values():result.service=servicewithcontextlib.suppress(Exception):self.auto_save=self._service.options.get("auto_save",False)fordatainself.child_data():data._set_service(service)
[docs]defadd_tags_recursive(self,tags2add:List[str])->None:"""Add tags to this experiment itself and its descendants Args: tags2add - the tags that will be added to the existing tags """self.tags+=tags2addfordatainself._child_data.values():data.add_tags_recursive(tags2add)
[docs]defremove_tags_recursive(self,tags2remove:List[str])->None:"""Remove tags from this experiment itself and its descendants Args: tags2remove - the tags that will be removed from the existing tags """self.tags=[xforxinself.tagsifxnotintags2remove]fordatainself._child_data.values():data.remove_tags_recursive(tags2remove)
# represetnation and serializationdef__repr__(self):out=f"{type(self).__name__}({self.experiment_type}"out+=f", {self.experiment_id}"ifself.parent_id:out+=f", parent_id={self.parent_id}"ifself.tags:out+=f", tags={self.tags}"ifself.job_ids:out+=f", job_ids={self.job_ids}"ifself.share_level:out+=f", share_level={self.share_level}"ifself.metadata:out+=f", metadata=<{len(self.metadata)} items>"ifself.figure_names:out+=f", figure_names={self.figure_names}"ifself.notes:out+=f", notes={self.notes}"ifself._extra_data:forkey,valinself._extra_data.items():out+=f", {key}={repr(val)}"out+=")"returnoutdef__getattr__(self,name:str)->Any:try:returnself._extra_data[name]exceptKeyError:# pylint: disable=raise-missing-fromraiseAttributeError(f"Attribute {name} is not defined")def_safe_serialize_jobs(self):"""Return serializable object for stored jobs"""# Since Job objects are not serializable this removes# them from the jobs dict and returns {job_id: None}# that can be used to retrieve jobs from a service after loadingjobs=ThreadSafeOrderedDict()withself._jobs.lock:forjidinself._jobs.keys():jobs[jid]=Nonereturnjobsdef_safe_serialize_figures(self):"""Return serializable object for stored figures"""# Convert any MPL figures into SVG images before serializingfigures=ThreadSafeOrderedDict()withself._figures.lock:forname,figureinself._figures.items():ifisinstance(figure,pyplot.Figure):figures[name]=plot_to_svg_bytes(figure)else:figures[name]=figurereturnfiguresdef__json_encode__(self):ifany(notfut.done()forfutinself._job_futures.values()):raiseQiskitError("Not all experiment jobs have finished. Jobs must be ""cancelled or done to serialize experiment data.")ifany(notfut.done()forfutinself._analysis_futures.values()):raiseQiskitError("Not all experiment analysis has finished. Analysis must be ""cancelled or done to serialize experiment data.")json_value={"_db_data":self._db_data,"_analysis_results":self._analysis_results,"_analysis_callbacks":self._analysis_callbacks,"_deleted_figures":self._deleted_figures,"_deleted_analysis_results":self._deleted_analysis_results,"_result_data":self._result_data,"_extra_data":self._extra_data,"_created_in_db":self._created_in_db,"_figures":self._safe_serialize_figures(),# Convert figures to SVG"_jobs":self._safe_serialize_jobs(),# Handle non-serializable objects"_experiment":self._experiment,"_child_data":self._child_data,}# the attribute self._service in charge of the connection and communication with the# experiment db. It doesn't have meaning in the json format so there is no need to serialize# it.forattin["_service","_backend"]:json_value[att]=Nonevalue=getattr(self,att)ifvalueisnotNone:LOG.info("%s cannot be JSON serialized",str(type(value)))returnjson_value@classmethoddef__json_decode__(cls,value):ret=cls()foratt,att_valinvalue.items():setattr(ret,att,att_val)returnretdef__getstate__(self):ifany(notfut.done()forfutinself._job_futures.values()):LOG.warning("Not all job futures have finished."" Data from running futures will not be serialized.")ifany(notfut.done()forfutinself._analysis_futures.values()):LOG.warning("Not all analysis callbacks have finished."" Results from running callbacks will not be serialized.")state=self.__dict__.copy()# Remove non-pickleable attributesforkeyin["_job_futures","_analysis_futures","_analysis_executor","_monitor_executor"]:delstate[key]# Convert figures to SVGstate["_figures"]=self._safe_serialize_figures()# Handle partially pickleable attributesstate["_jobs"]=self._safe_serialize_jobs()returnstate
[docs]@staticmethoddefget_service_from_backend(backend):"""Initializes the server from the backend data"""db_url="https://auth.quantum-computing.ibm.com/api"try:provider=backend._provider# qiskit-ibmq-provider styleifhasattr(provider,"credentials"):token=provider.credentials.token# qiskit-ibm-provider styleifhasattr(provider,"_account"):token=provider._account.tokenservice=IBMExperimentService(token=token,url=db_url)returnserviceexceptException:# pylint: disable=broad-exceptreturnNone
@contextlib.contextmanagerdefservice_exception_to_warning():"""Convert an exception raised by experiment service to a warning."""try:yieldexceptException:# pylint: disable=broad-exceptLOG.warning("Experiment service operation failed: %s",traceback.format_exc())
[docs]classExperimentStatus(enum.Enum):"""Class for experiment status enumerated type."""EMPTY="experiment data is empty"INITIALIZING="experiment jobs are being initialized"VALIDATING="experiment jobs are validating"QUEUED="experiment jobs are queued"RUNNING="experiment jobs is actively running"CANCELLED="experiment jobs or analysis has been cancelled"POST_PROCESSING="experiment analysis is actively running"DONE="experiment jobs and analysis have successfully run"ERROR="experiment jobs or analysis incurred an error"def__json_encode__(self):returnself.name@classmethoddef__json_decode__(cls,value):returncls.__members__[value]# pylint: disable=unsubscriptable-object
[docs]classAnalysisStatus(enum.Enum):"""Class for analysis callback status enumerated type."""QUEUED="analysis callback is queued"RUNNING="analysis callback is actively running"CANCELLED="analysis callback has been cancelled"DONE="analysis callback has successfully run"ERROR="analysis callback incurred an error"def__json_encode__(self):returnself.name@classmethoddef__json_decode__(cls,value):returncls.__members__[value]# pylint: disable=unsubscriptable-object
@dataclasses.dataclassclassAnalysisCallback:"""Dataclass for analysis callback status"""name:str=""callback_id:str=""status:AnalysisStatus=AnalysisStatus.QUEUEDerror_msg:Optional[str]=Noneevent:Event=dataclasses.field(default_factory=Event)def__getstate__(self):# We need to remove the Event object from state when pickling# since events are not pickleablestate=self.__dict__state["event"]=Nonereturnstatedef__json_encode__(self):returnself.__getstate__()