Source code for qiskit_qec.circuits.repetition_code

# -*- coding: utf-8 -*-

# This code is part of Qiskit.
#
# (C) Copyright IBM 2019.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

# pylint: disable=invalid-name

"""Generates circuits based on repetition codes."""
from copy import deepcopy
from typing import List, Optional, Tuple

import numpy as np
import rustworkx as rx
import networkx as nx

from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister, transpile

from qiskit_qec.circuits.code_circuit import CodeCircuit
from qiskit_qec.utils import DecodingGraphEdge, DecodingGraphNode
from qiskit_qec.utils.decoding_graph_attributes import _nodes2cpp
from qiskit_qec.circuits._c_circuits import _c_check_nodes, _c_is_cluster_neutral


def _separate_string(string):
    separated_string = []
    for syndrome_type_string in string.split("  "):
        separated_string.append(syndrome_type_string.split(" "))
    return separated_string


[docs] class RepetitionCodeCircuit(CodeCircuit): """RepetitionCodeCircuit class.""" def __init__( self, d: int, T: int, xbasis: bool = False, resets: bool = False, delay: Optional[int] = None, barriers: bool = False, ): """ Creates the circuits corresponding to a logical 0 and 1 encoded using a repetition code. Implementation of a distance d repetition code, implemented over T syndrome measurement rounds. Args: d (int): Number of code qubits (and hence repetitions) used. T (int): Number of rounds of ancilla-assisted syndrome measurement. xbasis (bool): Whether to use the X basis to use for encoding (Z basis used by default). resets (bool): Whether to include a reset gate after mid-circuit measurements. delay (float): Time (in dt) to delay after mid-circuit measurements (and delay). barriers (bool): Whether to include barriers between different sections of the code. Additional information: No measurements are added to the circuit if `T=0`. Otherwise `T` rounds are added, followed by measurement of the code qubits (corresponding to a logical measurement and final syndrome measurement round). """ super().__init__() self.n = d self.d = d self.T = 0 self.code_qubit = QuantumRegister(d, "code_qubit") self.link_qubit = QuantumRegister((d - 1), "link_qubit") self.qubit_registers = {"code_qubit", "link_qubit"} self.link_bits = [] self.code_bit = ClassicalRegister(d, "code_bit") self.circuit = {} for log in ["0", "1"]: self.circuit[log] = QuantumCircuit(self.link_qubit, self.code_qubit, name=log) self._xbasis = xbasis self._resets = resets self._barriers = barriers self._preparation() delay = delay or 0 for _ in range(T - 1): self.syndrome_measurement(delay=delay) if T != 0: self.syndrome_measurement(final=True) self.readout() self.gauge_ops = [[j, j + 1] for j in range(self.d - 1)] self.measured_logical = [[0], [self.d - 1]] self.basis = "x" self.resets = resets self.delay = delay self.base = "0"
[docs] def get_circuit_list(self) -> List[QuantumCircuit]: """Returns circuit list. circuit_list: self.circuit as a list, with circuit_list[0] = circuit['0'] circuit_list[1] = circuit['1'] """ circuit_list = [self.circuit[log] for log in ["0", "1"]] return circuit_list
[docs] def x(self, logs=("0", "1"), barrier=False): """Applies a logical x to the circuits for the given logical values. Args: logs (list or tuple): List or tuple of logical values expressed as strings. barrier (bool): Boolean denoting whether to include a barrier at the start. """ barrier = barrier or self._barriers for log in logs: if barrier and (log == "1" or self._xbasis): self.circuit[log].barrier() if self._xbasis: self.circuit[log].z(self.code_qubit) else: self.circuit[log].x(self.code_qubit)
def _preparation(self): """Prepares logical bit states by applying an x to the circuit that will encode a 1. """ for log in ["0", "1"]: if self._xbasis: self.circuit[log].h(self.code_qubit) self.x(["1"])
[docs] def syndrome_measurement(self, final: bool = False, barrier: bool = False, delay: int = 0): """Application of a syndrome measurement round. Args: final (bool): Whether to disregard the reset (if applicable) due to this being the final syndrome measurement round. barrier (bool): Boolean denoting whether to include a barrier at the start. delay (float): Time (in dt) to delay after mid-circuit measurements (and delay). """ barrier = barrier or self._barriers self.link_bits.append(ClassicalRegister((self.d - 1), "round_" + str(self.T) + "_link_bit")) for log in ["0", "1"]: self.circuit[log].add_register(self.link_bits[-1]) # entangling gates if barrier: self.circuit[log].barrier() if self._xbasis: self.circuit[log].h(self.link_qubit) for j in range(self.d - 1): if self._xbasis: self.circuit[log].cx(self.link_qubit[j], self.code_qubit[j]) else: self.circuit[log].cx(self.code_qubit[j], self.link_qubit[j]) for j in range(self.d - 1): if self._xbasis: self.circuit[log].cx(self.link_qubit[j], self.code_qubit[j + 1]) else: self.circuit[log].cx(self.code_qubit[j + 1], self.link_qubit[j]) if self._xbasis: self.circuit[log].h(self.link_qubit) # measurement if barrier: self.circuit[log].barrier() for j in range(self.d - 1): self.circuit[log].measure(self.link_qubit[j], self.link_bits[self.T][j]) # resets if self._resets and not final: if barrier: self.circuit[log].barrier() for j in range(self.d - 1): self.circuit[log].reset(self.link_qubit[j]) # delay if delay > 0 and not final: if barrier: self.circuit[log].barrier() for j in range(self.d - 1): self.circuit[log].delay(delay, self.link_qubit[j]) self.T += 1
[docs] def readout(self): """ Readout of all code qubits, which corresponds to a logical measurement as well as allowing for a measurement of the syndrome to be inferred. """ for log in ["0", "1"]: if self._xbasis: self.circuit[log].h(self.code_qubit) self.circuit[log].add_register(self.code_bit) self.circuit[log].measure(self.code_qubit, self.code_bit)
def _process_string(self, string): # logical readout taken from measured_log = string[0] + " " + string[self.d - 1] if self._resets: syndrome = string[self.d :] else: # if there are no resets, results are cumulative and need to be separated cumsyn_list = string[self.d :].split(" ") syndrome_list = [] for tt, cum_syn in enumerate(cumsyn_list[0:-1]): syn = "" for j in range(len(cum_syn)): syn += str(int(cumsyn_list[tt][j] != cumsyn_list[tt + 1][j])) syndrome_list.append(syn) syndrome_list.append(cumsyn_list[-1]) syndrome = " ".join(syndrome_list) # final syndrome deduced from final code qubit readout full_syndrome = "" for j in range(self.d - 1): full_syndrome += "0" * (string[j] == string[j + 1]) + "1" * (string[j] != string[j + 1]) # results from all other syndrome measurements then added full_syndrome = full_syndrome + syndrome # changes between one syndrome and the next then calculated syndrome_list = full_syndrome.split(" ") syndrome_changes = "" for t in range(self.T + 1): for j in range(self.d - 1): if t == 0: change = syndrome_list[-1][j] != "0" else: change = syndrome_list[-t - 1][j] != syndrome_list[-t][j] syndrome_changes += "0" * (not change) + "1" * change syndrome_changes += " " # the space separated string of syndrome changes then gets a # double space separated logical value on the end new_string = measured_log + " " + syndrome_changes[:-1] return new_string
[docs] def string2nodes(self, string, **kwargs): """Convert output string from circuits into a set of nodes. Args: string (string): Results string to convert. kwargs (dict): Additional keyword arguments. logical (str): Logical value whose results are used ('0' as default). all_logicals (bool): Whether to include logical nodes irrespective of value. (False as default). Returns: dict: List of nodes corresponding to to the non-trivial elements in the string. Additional information: Strings are read right to left, but lists* are read left to right. So, we have some ugly indexing code whenever we're dealing with both strings and lists. """ # set kwargs all_logicals = kwargs.get("all_logicals") logical = kwargs.get("logical") if logical is None: logical = "0" string = self._process_string(string) # [ <logical>, <syn>, <syn>,...] separated_string = _separate_string(string) nodes = [] # logical/boundary nodes boundary = separated_string[0] # [<last_elem>, <init_elem>] for bqec_index, belement in enumerate(boundary[::-1]): if all_logicals or belement != logical: bqubits = self.measured_logical[bqec_index] bnode = DecodingGraphNode( is_logical=True, is_boundary=True, qubits=bqubits, index=bqec_index ) nodes.append(bnode) # bulk nodes for syn_type in range(1, len(separated_string)): for syn_round in range(len(separated_string[syn_type])): elements = separated_string[syn_type][syn_round] for qec_index, element in enumerate(elements[::-1]): if element == "1": qubits = self.gauge_ops[qec_index] node = DecodingGraphNode(time=syn_round, qubits=qubits, index=qec_index) nodes.append(node) return nodes
[docs] def string2raw_logicals(self, string): """ Extracts raw logicals from output string. Args: string (string): Results string from which to extract logicals Returns: list: Raw values for logical operators that correspond to nodes. """ return [string.split(" ", maxsplit=1)[0][-1]]
[docs] def check_nodes(self, nodes, ignore_extras=False, minimal=False): """ Determines whether a given set of nodes are neutral. If so, also determines any additional logical readout qubits that would be flipped by the errors creating such a cluster and how many errors would be required to make the cluster. Args: nodes (list): List of nodes, of the type produced by `string2nodes`. ignore_extras (bool): If `True`, undeeded boundary nodes are ignored. minimal (bool): Whether output should only reflect the minimal error case. Returns: neutral (bool): Whether the nodes independently correspond to a valid set of errors. flipped_logical_nodes (list): List of qubits nodes for logical operators that are flipped by the errors, that were not included in the original nodes. num_errors (int): Minimum number of errors required to create nodes. """ # see which qubits for logical zs are given and collect bulk nodes given_logicals = [] for node in nodes: if node.is_logical: given_logicals += node.qubits given_logicals = set(given_logicals) # bicolour code qubits according to the domain walls walls = [] for node in nodes: if not node.is_logical: walls.append(node.qubits[1]) walls.sort() c = 0 colors = "" for j in range(self.d): if walls: if walls[0] == j: c = (c + 1) % 2 walls.remove(j) colors += str(c) colors = colors[::-1] # determine which were in the minority error_c_min = str(int(colors.count("1") < self.d / 2)) # and majority error_c_max = str((int(error_c_min) + 1) % 2) # list the colours with the max error one first # (unless we do min only) error_cs = [] if not minimal: error_cs.append(error_c_max) error_cs.append(error_c_min) # see what happens for both colours # if neutral for maximal, it's neutral # otherwise, it is whatever it is for the minimal for error_c in error_cs: num_errors = colors.count(error_c) # determine the corresponding flipped logicals flipped_logicals = [] for j in [0, self.d - 1]: if colors[-1 - j] == error_c: flipped_logicals.append(j) flipped_logicals = set(flipped_logicals) # if unneeded logical zs are given, cluster is not neutral # (unless this is ignored) if (not ignore_extras) and given_logicals.difference(flipped_logicals): neutral = False # otherwise, report only needed logicals that aren't given else: neutral = True flipped_logicals = flipped_logicals.difference(given_logicals) flipped_logical_nodes = [] for flipped_logical in flipped_logicals: qubits = [flipped_logical] elem = self.measured_logical.index(qubits) node = DecodingGraphNode( is_logical=True, is_boundary=True, qubits=qubits, index=elem ) flipped_logical_nodes.append(node) if neutral and not flipped_logical_nodes: break return neutral, flipped_logical_nodes, num_errors
[docs] def is_cluster_neutral(self, nodes): """ Determines whether or not the cluster is neutral, meaning that one or more errors could have caused the set of nodes (syndrome changes) passed to the method. Args: nodes (list of nodes) """ return not bool(len(nodes) % 2)
[docs] def partition_outcomes( self, round_schedule: str, outcome: List[int] ) -> Tuple[List[List[int]], List[List[int]], List[int]]: """Extract measurement outcomes.""" # split into gauge and final outcomes outcome = "".join([str(c) for c in outcome]) outcome = outcome.split(" ") gs = outcome[0:-1] gauge_outcomes = [[int(c) for c in r] for r in gs] finals = outcome[-1] # if circuit did not use resets, construct standard output if not self.resets: for i, layer in enumerate(gauge_outcomes): for j, gauge_op in enumerate(layer): if i > 0: gauge_outcomes[i][j] = (gauge_op + gauge_outcomes[i - 1][j]) % 2 # assign outcomes to the correct gauge ops if round_schedule == "z": x_gauge_outcomes = [] z_gauge_outcomes = gauge_outcomes else: x_gauge_outcomes = gauge_outcomes z_gauge_outcomes = [] final_outcomes = [int(c) for c in finals] return x_gauge_outcomes, z_gauge_outcomes, final_outcomes
def add_edge(graph, pair, edge=None): """ Adds an edge correspoding to the given pair of nodes to the given graph, adding also the nodes themselves if not already present. """ ns = [] for node in pair: if node not in graph.nodes(): ns.append(graph.add_node(node)) else: ns.append(graph.nodes().index(node)) graph.add_edge(ns[0], ns[1], edge) return ns
[docs] class ArcCircuit(CodeCircuit): """Anisotropic repetition code class.""" METHOD_SPITZ: str = "spitz" METHOD_NAIVE: str = "naive" AVAILABLE_METHODS = {METHOD_SPITZ, METHOD_NAIVE} def __init__( self, links: list, T: int, basis: str = "xy", logical: str = "0", resets: bool = True, delay: Optional[int] = None, barriers: bool = True, color: Optional[dict] = None, max_dist: int = 2, schedule: Optional[list] = None, run_202: bool = True, rounds_per_202: int = 9, conditional_reset: bool = False, ): """ Creates circuits corresponding to an anisotropic repetition code implemented over T syndrome measurement rounds, with the syndrome measurements provided. Args: links (list): List of tuples (c0, a, c1), where c0 and c1 are the two code qubits in each syndrome measurement, and a is the auxiliary qubit used. T (int): Number of rounds of syndrome measurement. basis (string): Pair of `'x'`, `'y'` and `'z'`, specifying the pair of local bases to be used. logical (string): Logical value to store (`'0'` or `'1'`). resets (bool): Whether to include a reset gate after mid-circuit measurements. ff (bool): Whether to correct the effects of [[2,0,2]] sequences via feed forward. delay (float): Time (in dt) to delay after mid-circuit measurements (and delay). barriers (bool): Whether to include barriers between different sections of the code. color (dict): Dictionary with code qubits as keys and 0 or 1 for each value, to specify a predetermined bicoloring. If not provided, a bicoloring is found on initialization. max_dist (int): Maximum edge distance used when determining the bicoloring of code qubits. schedule (list): Specifies order in which entangling gates are applied in each syndrome measurement round. Each element is a list of lists [c, a] for entangling gates to be applied simultaneously. run_202 (bool): Whether to run [[2,0,2]] sequences. This will be overwritten if T is not high enough (at least rounds_per_202xlen(links)). rounds_per_202 (int): Number of rounds that are part of the 202, including the typical link measurements at the beginning and end. At least 9 are required to get an event dedicated to conjugate errors. conditional_reset: Whether to apply conditional resets (an x conditioned on the result of the previous measurement), rather than a reset gate. """ super().__init__() self.links = links self.basis = basis self.logical = logical self._barriers = barriers self._max_dist = max_dist self.delay = delay or 0 self.conditional_reset = conditional_reset # calculate coloring and schedule, etc if color is None: self._coloring() else: self.color = color if schedule is None: self._scheduling() else: self.schedule = schedule self._get_cycles() self._preparation() # determine the placement of [2,0,2] rounds self.rounds_per_202 = rounds_per_202 if run_202: self.links_202 = [] for link in self.links: logical_overlap = {link[0], link[2]}.intersection(set(self.z_logicals)) if not logical_overlap: self.links_202.append(link) num_links = len(self.links_202) if num_links > 0: self.rounds_per_link = int(np.floor(T / num_links)) self.metabuffer = np.ceil((T - num_links * self.rounds_per_link) / 2) self.roundbuffer = np.ceil((self.rounds_per_link - self.rounds_per_202) / 2) if self.roundbuffer > 0: self.roundbuffer -= 1 self.run_202 = self.rounds_per_link >= self.rounds_per_202 else: self.run_202 = False else: self.run_202 = False self.resets = resets or self.run_202 if not self.run_202: self.rounds_per_link = np.inf # create the circuit self.base = basis self.T = 0 for _ in range(T - 1): self._syndrome_measurement() if T != 0: self._syndrome_measurement(final=True) self._readout() self._cpp_link_graph, self._cpp_link_neighbors = self._links2cpp() def _get_link_graph(self, max_dist=1): graph = rx.PyGraph() for link in self.links: add_edge(graph, (link[0], link[2]), {"distance": 1, "link qubit": link[1]}) distance = rx.distance_matrix(graph) edges = graph.edge_list() for n0, node0 in enumerate(graph.nodes()): for n1, node1 in enumerate(graph.nodes()): if n0 < n1: if (n0, n1) not in edges: dist = distance[n0, n1] if dist < max_dist: add_edge(graph, (node0, node1), {"distance": dist}) return graph def _get_cycles(self): """ For each edge in the link graph (expressed in terms of the pair of qubits), the set of qubits around adjacent cycles is found. """ self.link_graph = self._get_link_graph() self.degree = {} for n, q in enumerate(self.link_graph.nodes()): self.degree[q] = self.link_graph.degree(n) degrees = list(self.degree.values()) self._linear = degrees.count(1) == 2 and degrees.count(2) == len(degrees) - 2 lg_edges = set(self.link_graph.edge_list()) lg_nodes = self.link_graph.nodes() ng = nx.Graph() for n0, n1 in self.link_graph.edge_list(): ng.add_edge(n0, n1) # express the cycles in terms of the ns of the link graph self.cycles = nx.minimum_cycle_basis(ng) # and for each pair of data qubits, list the cycles it is a part of self.cycle_dict = {(lg_nodes[edge[0]], lg_nodes[edge[1]]): set() for edge in lg_edges} for c, cycle in enumerate(self.cycles): for n0 in cycle: for n1 in cycle: for edge in [(n0, n1), (n1, n0)]: if edge in lg_edges: self.cycle_dict[lg_nodes[edge[0]], lg_nodes[edge[1]]].add(c) def _coloring(self): """ Creates a graph with a weight=1 edge for each link, and additional edges up to `max_weight`. Then performs a matching of edges in this graph. The code qubits of each pair are then bicolored. All unmatched code qubits are alternately bicolored. """ graph = self._get_link_graph(self._max_dist) matching = rx.max_weight_matching( graph, max_cardinality=True, weight_fn=lambda edge: -int(edge["distance"]) ) self.color = {} unmatched = list(graph.node_indices()) nodes = graph.nodes() for pair in matching: for j, n in enumerate(pair): self.color[nodes[n]] = j unmatched.remove(n) for j, n in enumerate(unmatched): # color opposite to a colored neighbor neighbors = graph.neighbors(n) if neighbors: for nn in neighbors: if nodes[nn] in self.color: self.color[nodes[n]] = (self.color[nodes[nn]] + 1) % 2 else: # otherwise color arbitrarily self.color[nodes[n]] = j % 2 def _get_coupling_graph(self, aux=None): """ Returns a graph for pairs of nodes on which entangling gates are applied in the code. """ if aux is None: aux = [link[1] for link in self.links] graph = rx.PyGraph() for link in self.links: if link[1] in aux: add_edge(graph, (link[0], link[1]), {}) add_edge(graph, (link[1], link[2]), {}) # we use max degree of the nodes as the edge weight, to delay bottlenecks for e, (n0, n1) in enumerate(graph.edge_list()): graph.edges()[e]["weight"] = max(graph.degree(n0), graph.degree(n1)) return graph def _scheduling(self): """ Determines the order in which entangling gates should be applied in each round. """ link_dict = {link[1]: link for link in self.links} aux = set(link_dict.keys()) def weight_fn(edge): return -int(edge["weight"]) schedule = [] while aux: # construct coupling graph for as yet unpaired auxiliaries (i.e. link qubits) graph = self._get_coupling_graph(aux) # find a min weight matching, and then another that exlcudes the pairs from the first matching = [rx.max_weight_matching(graph, max_cardinality=True, weight_fn=weight_fn)] cut_graph = deepcopy(graph) for n0, n1 in matching[0]: cut_graph.remove_edge(n0, n1) matching.append( rx.max_weight_matching(cut_graph, max_cardinality=True, weight_fn=weight_fn) ) # rewrite the matchings to use nodes instead of indices, and to always place # the auxilliary second nodes = [graph.nodes(), cut_graph.nodes()] for j in range(2): matching[j] = list(matching[j]) for p, pair in enumerate(matching[j]): node_pair = [None, None] for n in pair: node = nodes[j][n] node_pair[node in aux] = node matching[j][p] = node_pair # determine which links are covered by the conjuction of these matchings matched_aux = [set(), set()] for j in range(2): for pair in matching[j]: matched_aux[j].add(pair[1]) completed = matched_aux[0].intersection(matched_aux[1]) # add these matched pairs to the schedule for j in range(2): schedule.append([pair for pair in matching[j] if pair[1] in completed]) # update the list of auxilliaries for links yet to be paired aux = aux.difference(completed) self.schedule = schedule def _rotate(self, basis, c, regqubit, inverse): """ Rotates the given qubit to (or from) the basis specified by the color and the pair of bases used for the code. """ if not inverse: if basis[c] in ["x", "y"]: self.circuit[basis].h(regqubit) if basis[c] == "y": self.circuit[basis].s(regqubit) else: if basis[c] == "y": self.circuit[basis].sdg(regqubit) if basis[c] in ["x", "y"]: self.circuit[basis].h(regqubit) def _basis_change(self, basis, inverse=False): """ Rotates all code qubits to (or from) the bases required for the code. """ for qubit, q in self.code_index.items(): c = self.color[qubit] self._rotate(basis, c, self.code_qubit[q], inverse) def _preparation(self): """ Creates the circuits and their registers. """ # get a list of all code qubits (qubits[0]) and link qubits (qubits[1]) qubits = [[], []] for link in self.links: for code_qubit in [link[0], link[2]]: if code_qubit not in qubits[0]: qubits[0].append(code_qubit) qubits[1].append(link[1]) self.qubits = [qubits[j] for j in range(2)] self.num_qubits = [len(qubits[j]) for j in range(2)] self.d = self.num_qubits[0] # define the quantum egisters self.code_qubit = QuantumRegister(self.num_qubits[0], "code_qubit") self.link_qubit = QuantumRegister(self.num_qubits[1], "link_qubit") self.qubit_registers = {"code_qubit", "link_qubit"} # for looking up where each qubit lives on the quantum registers self.code_index = {qubit: q for q, qubit in enumerate(list(qubits[0]))} self.link_index = {qubit: q for q, qubit in enumerate(list(qubits[1]))} # set up the classical registers self.link_bits = [] self.code_bit = ClassicalRegister(len(qubits[0]), "code_bit") # create the circuits and initialize the code qubits self.circuit = {} for basis in list({self.basis, self.basis[::-1]}): self.circuit[basis] = QuantumCircuit(self.link_qubit, self.code_qubit, name=basis) if self.logical == "1": self.circuit[basis].x(self.code_qubit) self._basis_change(basis) # use degree 1 code qubits for logical z readouts (and boundary) graph = self._get_coupling_graph() self._leaves = False z_logicals = [] self.boundary = [] for n, node in enumerate(graph.nodes()): if graph.degree(n) == 1: z_logicals.append(node) self.boundary.append(node) self._leaves = True # if there are none, just use the first (not boundary) if not z_logicals: z_logicals = [min(self.code_index.keys())] self.z_logicals = z_logicals def _get_202(self, t): """ Returns the position within a 202 sequence for the current measurement round: * `False` means not part of a 202 sequence; * Even taus use the standard coloring; * Odd taus use the flipped coloring. Also returns the link qubits for the link for which the 202 sequence is run and its neigbours. """ # null values in case no 202 done during this round tau, qubit_l_202, qubit_l_nghbrs = None, None, [[], []] if self.run_202 and int(t / self.rounds_per_link) < len(self.links_202): # determine the link qubit for which the 202 sequence is run link = self.links_202[int(t / self.rounds_per_link)] # set the 202 link qubit_l_202 = link[1] # determine where we are in the sequence tau = int(t % self.rounds_per_link - self.roundbuffer) if t < 0 or tau not in range(self.rounds_per_202): tau = False # determine the neighbouring link qubits that are suppressed graph = self._get_link_graph(0) nodes = graph.nodes() edges = graph.edge_list() ns = [nodes.index(link[j]) for j in [0, 2]] qubit_l_nghbrs = [] for n in ns: neighbors = list(graph.incident_edges(n)) neighbors.remove(list(edges).index(tuple(ns))) qubit_l_nghbrs.append( [graph.get_edge_data_by_index(ngbhr)["link qubit"] for ngbhr in neighbors] ) return tau, qubit_l_202, qubit_l_nghbrs def _syndrome_measurement(self, final: bool = False): """ Adds a syndrome measurement round. Args: final (bool): Whether or not this is the final round. """ self.link_bits.append( ClassicalRegister(self.num_qubits[1], "round_" + str(self.T) + "_link_bit") ) tau, qubit_l_202, qubit_l_nghbrs = self._get_202(self.T) links_to_measure = set() links_to_reset = set() for basis, qc in self.circuit.items(): if self._barriers: # pre round barrier qc.barrier() for pairs in self.schedule: for qubit_c, qubit_l in pairs: q_c = self.code_index[qubit_c] q_l = self.link_index[qubit_l] neighbor = qubit_l in qubit_l_nghbrs[0] + qubit_l_nghbrs[1] if not (tau in range(1, self.rounds_per_202 - 1) and neighbor): c = self.color[qubit_c] if qubit_l == qubit_l_202: c = (c + tau) % 2 self._rotate(basis, c, self.code_qubit[q_c], True) qc.cx(self.code_qubit[q_c], self.link_qubit[q_l]) self._rotate(basis, c, self.code_qubit[q_c], False) links_to_measure.add(q_l) if not (not isinstance(tau, bool) and tau == 0 and neighbor): links_to_reset.add(q_l) # measurement and resets for basis, qc in self.circuit.items(): # measurement if final: # already prep code qubits for readout self._basis_change(basis, inverse=True) if self._barriers: # post-round, pre-measurement barrier qc.barrier() qc.add_register(self.link_bits[self.T]) for q_l in links_to_measure: qc.measure(self.link_qubit[q_l], self.link_bits[self.T][q_l]) # resets if self.resets and not final: for q_l in links_to_reset: if self.conditional_reset: qc.x(self.link_qubit[q_l]).c_if(self.link_bits[self.T][q_l], 1) else: qc.reset(self.link_qubit[q_l]) # correct if self.run_202: if tau == (self.rounds_per_202 - 1): target_link = self.links[self.link_index[qubit_l_202]] # for neighbouring links on both sides of the 202 link for j in range(2): if qubit_l_nghbrs[j]: # get the first listed neighbouring link control_link = self.links[self.link_index[qubit_l_nghbrs[j][0]]] # find the qubit on which it overlaps with the 202 qubit_t = list(set(target_link).intersection(set(control_link)))[0] # and the qubit whose result controls the feedforward qubit_c = control_link[1] # get their indices q_t = self.code_index[qubit_t] q_c = self.link_index[qubit_c] # and the colour of the targeted qubit c = self.color[qubit_t] self._rotate(basis, c, self.code_qubit[q_t], True) qc.x(self.code_qubit[q_t]).c_if(self.link_bits[self.T][q_c], 1) self._rotate(basis, c, self.code_qubit[q_t], False) # delay if self.delay > 0 and not final: if self._barriers: # post-reset, pre-delay barrier qc.barrier() qc.delay(self.delay, self.link_qubit) self.T += 1 def _readout(self): """ Readout of all code qubits, which corresponds to a logical measurement as well as allowing for a measurement of the syndrome to be inferred. """ for basis, qc in self.circuit.items(): if self.T == 0: self._basis_change(basis, inverse=True) if self._barriers: qc.barrier() # otherwise, code qubits are already prepped qc.add_register(self.code_bit) qc.measure(self.code_qubit, self.code_bit) def _process_string(self, string): # logical readout taken from assigned qubits measured_log = "" for qubit in self.z_logicals: j = self.code_index[qubit] measured_log += string[self.num_qubits[0] - j - 1] + " " if self.resets: syndrome = string[self.num_qubits[0] :] else: # if there are no resets, results are cumulative and need to be separated cumsyn_list = string[self.num_qubits[0] :].split(" ") syndrome_list = [] for tt, cum_syn in enumerate(cumsyn_list[0:-1]): syn = "" for j in range(len(cum_syn)): syn += str(int(cumsyn_list[tt][j] != cumsyn_list[tt + 1][j])) syndrome_list.append(syn) syndrome_list.append(cumsyn_list[-1]) syndrome = " ".join(syndrome_list) # final syndrome deduced from final code qubit readout full_syndrome = "" for link in self.links: q = [self.num_qubits[0] - 1 - self.code_index[link[j]] for j in [0, -1]] full_syndrome += "0" * (string[q[0]] == string[q[1]]) + "1" * ( string[q[0]] != string[q[1]] ) full_syndrome = full_syndrome[::-1] # results from all other syndrome measurements then added full_syndrome = full_syndrome + syndrome # changes between appropriate results are then calculated syndrome_list = full_syndrome.split(" ") syndrome_changes = "" last_neighbors = [] just_finished = False for t in range(self.T + 1): tau, qubit_l_202, qubit_l_nghbrs = self._get_202(t) all_neighbors = qubit_l_nghbrs[0] + qubit_l_nghbrs[1] for j in range(self.num_qubits[1]): dt = None q_l = self.num_qubits[1] - 1 - j qubit_l = self.links[q_l][1] # the first results are themselves the changes change = None if t == 0: change = syndrome_list[-1][j] != "0" # if the link was involved in a just finished 202... elif just_finished: # skip back self.rounds_per_202 for a neighbouring link if qubit_l in last_neighbors: dt = self.rounds_per_202 # and just 1 for all others (as normal) else: dt = 1 # otherwise, everything not during a 202 just compares # results with the previous round (as normal) elif tau not in range(1, self.rounds_per_202): dt = 1 # and all others depend on the placement of the link # within the current 202 else: # if this link is the 202 link if qubit_l == qubit_l_202: if tau == 1: change = False else: dt = 2 # if this link neighbours the 202 link elif qubit_l in all_neighbors: change = False # if the link is not near the 202 link (or there are no 202s) else: dt = 1 # for those where we now have a dt, calculate the change if dt: change = syndrome_list[-t - 1][j] != syndrome_list[-t - 1 + dt][j] syndrome_changes += "0" * (not change) + "1" * change syndrome_changes += " " last_neighbors = all_neighbors.copy() just_finished = tau == (self.rounds_per_202 - 1) # the space separated string of syndrome changes then gets a # double space separated logical value on the end new_string = measured_log + " " + syndrome_changes[:-1] return new_string
[docs] def string2raw_logicals(self, string): """ Extracts raw logicals from output string. Args: string (string): Results string from which to extract logicals Returns: list: Raw values for logical operators that correspond to nodes. """ return _separate_string(self._process_string(string))[0]
[docs] def string2nodes(self, string, **kwargs) -> List[DecodingGraphNode]: """Convert output string from circuits into a set of nodes. Args: string (string): Results string to convert. kwargs (dict): Additional keyword arguments. See below. kwargs: all_logicals (bool): Whether to include logical nodes irrespective of value. (False as default). Returns: dict: List of nodes corresponding to to the non-trivial elements in the string. """ all_logicals = kwargs.get("all_logicals") string = self._process_string(string) separated_string = _separate_string(string) nodes = [] for syn_type, _ in enumerate(separated_string): for syn_round in range(len(separated_string[syn_type])): elements = separated_string[syn_type][syn_round] for elem_num, element in enumerate(elements): if (syn_type == 0 and (all_logicals or element != self.logical)) or ( syn_type != 0 and element == "1" ): is_logical = syn_type == 0 if is_logical: elem_num = syn_round syn_round = 0 code_qubits = [self.z_logicals[elem_num]] link_qubit = None else: link = self.links[-elem_num - 1] code_qubits = [link[0], link[2]] link_qubit = link[1] tau, _, _ = self._get_202(syn_round) if not tau: tau = 0 node = DecodingGraphNode( is_logical=is_logical, is_boundary=(is_logical and self._leaves), time=syn_round if not is_logical else None, qubits=code_qubits, index=elem_num, ) node.properties["conjugate"] = ((tau % 2) == 1) and tau > 1 node.properties["link qubit"] = link_qubit nodes.append(node) return nodes
[docs] @staticmethod def flatten_nodes(nodes: List[DecodingGraphNode]): """ Removes time information from a set of nodes, and consolidates those on the same position at different times. Also removes nodes corresponding to the conjugate error from [[2,0,2]]s. Args: nodes (list): List of nodes, of the type produced by `string2nodes`, to be flattened. Returns: flat_nodes (list): List of flattened nodes. """ # strip out conjugate nodes non_conj_nodes = [] for node in nodes: if not node.properties["conjugate"]: non_conj_nodes.append(node) nodes = non_conj_nodes # remove time info nodes_per_link = {} for node in nodes: link_qubit = node.properties["link qubit"] if link_qubit in nodes_per_link: nodes_per_link[link_qubit] += 1 else: nodes_per_link[link_qubit] = 1 flat_nodes = [] for node in nodes: if node.is_logical or node.is_boundary: flat_nodes.append(node) elif nodes_per_link[node.properties["link qubit"]] % 2: flat_node = deepcopy(node) flat_node.time = None if flat_node not in flat_nodes: flat_nodes.append(flat_node) return flat_nodes
def _links2cpp(self): """ Convert data about the link graph to the form required by C++ functions. """ nodes = self.link_graph.nodes() link_graph = [] for edge in self.link_graph.edge_list(): link_graph.append((nodes[edge[0]], nodes[edge[1]])) link_neighbors = {} for n, node in enumerate(self.link_graph.nodes()): link_neighbors[node] = [] for j in self.link_graph.neighbors(n): link_neighbors[node].append(nodes[j]) return link_graph, link_neighbors def _extras2cpp(self): """ Returns logical and boundary nodes as tuples. First value is the qubit, second is 1 for logical only, 2 for boundary only and 3 for both. """ extras = {} for q in self.z_logicals: extras[q] = 1 + 2 * (q in self.boundary) for q in self.boundary: if q not in self.z_logicals: extras[q] = 2 return extras
[docs] def check_nodes(self, nodes, ignore_extras=False, minimal=False): """ Determines whether a given set of nodes are neutral. If so, also determines any additional logical readout qubits that would be flipped by the errors creating such a cluster and how many errors would be required to make the cluster. Args: nodes (list): List of nodes, of the type produced by `string2nodes`. ignore_extras (bool): If `True`, undeeded boundary and logical nodes are ignored. minimal (bool): Whether output should only reflect the minimal error case. Returns: neutral (bool): Whether the nodes independently correspond to a valid set of errors. flipped_logical_nodes (list): List of qubits nodes for logical operators that are flipped by the errors, that were not included in the original nodes. num_errors (int): Minimum number of errors required to create nodes. """ nodes = _nodes2cpp(nodes) cpp_output = _c_check_nodes( nodes, ignore_extras, minimal, self.cycle_dict, self._cpp_link_graph, self._cpp_link_neighbors, self._extras2cpp(), ) neutral = bool(cpp_output[0]) num_errors = cpp_output[1] flipped_extra_nodes = [] for flipped_extra in cpp_output[2::]: is_logical = flipped_extra in self.z_logicals is_boundary = flipped_extra in self.boundary if is_logical: index = self.z_logicals.index(flipped_extra) else: index = self.boundary.index(flipped_extra) node = DecodingGraphNode( is_logical=is_logical, is_boundary=is_boundary, qubits=[flipped_extra], index=index, ) flipped_extra_nodes.append(node) return neutral, flipped_extra_nodes, num_errors
[docs] def is_cluster_neutral(self, nodes: dict): """ Determines whether or not the cluster is neutral, meaning that one or more errors could have caused the set of nodes (syndrome changes) passed to the method. Args: nodes: dictionary in the form of the return value of string2nodes """ nodes = _nodes2cpp(nodes) return _c_is_cluster_neutral( nodes, False, False, self.cycle_dict, self._cpp_link_graph, self._cpp_link_neighbors, self._extras2cpp(), self._linear, )
[docs] def transpile(self, backend, scheduling_method="alap"): """ Args: backend (qiskit.providers.ibmq.IBMQBackend): Backend to transpile and schedule the circuits for. The numbering of the qubits in this backend should correspond to the numbering used in `self.links`. scheduling_method (str): Name of scheduling pass. Arguemnt passed to `qiskit.transpile`. Returns: transpiled_circuit: As `self.circuit`, but with the circuits scheduled and remapped to the device connectivity. """ bases = list(self.circuit.keys()) circuits = [self.circuit[basis] for basis in bases] initial_layout = [] for qreg in circuits[0].qregs: qreg_index = int("link" in qreg.name) initial_layout += [ self.qubits[qreg_index][q] for q in range(self.num_qubits[qreg_index]) ] # transpile to backend circuits = transpile( circuits, backend, initial_layout=initial_layout, scheduling_method=scheduling_method ) return {basis: circuits[j] for j, basis in enumerate(bases)}
def _make_syndrome_graph(self): # get the list of nodes string = ( "1" * len(self.code_qubit) + " " + ("0" * len(self.links) + " ") * (self.T - 1) + "1" * len(self.links) ) nodes: List[DecodingGraphNode] = [] for node in self.string2nodes(string, all_logicals=True): if not node.is_logical: for t in range(self.T + 1): new_node = deepcopy(node) new_node.time = t if new_node not in nodes: nodes.append(new_node) else: node.time = None nodes.append(node) # find pairs that should be connected edges: List[Tuple[int, int]] = [] for n0, node0 in enumerate(nodes): for n1, node1 in enumerate(nodes): if n0 < n1: # just record all possible edges for now dt = abs((node1.time or 0) - (node0.time or 0)) adj = set(node0.qubits).intersection(set(node1.qubits)) if adj: if (node0.is_logical ^ node1.is_logical) or dt <= 1: edges.append((n0, n1)) elif not self.resets: if node0.qubits == node1.qubits and dt == 2: edges.append((n0, n1)) # put it all in a graph S = rx.PyGraph(multigraph=False) for node in nodes: S.add_node(node) for n0, n1 in edges: source = nodes[n0] target = nodes[n1] qubits = [] if not (source.is_logical and target.is_logical): qubits = list(set(source.qubits).intersection(target.qubits)) if source.time != target.time and len(qubits) > 1: qubits = [] edge = DecodingGraphEdge(qubits=qubits, weight=1) S.add_edge(n0, n1, edge) # remove invalid edges self.get_error_coords(None, S, remove_invalid_edges=True) # just record edges as hyperedges for now (should be improved later) hyperedges = [] for e, n0n1 in enumerate(S.edge_list()): hyperedges.append({n0n1: S.edges()[e]}) return S, hyperedges
[docs] def get_error_coords( self, counts, decoding_graph, method="spitz", remove_invalid_edges=False, return_samples=False, ): """ Uses the `get_error_probs` method of the given decoding graph to generate probabilities of single error events from given counts. The location and time of each error is also calculated. Args: counts (dict): Counts dictionary of the results to be analyzed. decoding_graph (DecodingGraph): Decoding graph object constructed from this code. method (string): Method to used for calculation. Supported methods are 'spitz' (default) and 'naive'. remove_invalid_edges (string): Whether to delete edges from the graph if they are found to be invalid. return_samples (bool): Whether to also return the number of samples used to calculated each probability. Returns: dict: Keys are the coordinates (qubit, start_time, end_time) for specific error events. Time refers to measurement rounds. Values are a dictionary whose keys are the edges that detected the event, and whose keys are the calculated probabilities. Additional information: Time calculation does not take into account get lengths. It assumes that the subrounds within the schedule and the measurement all take the same time. Time is in units of rounds. """ # though the documented use case requires a decoding graph and a counts dict, there is also an # undocumented internal use case, where just the bare graph is provided and no counts. This is # to find and delete invalid edges if isinstance(decoding_graph, rx.PyGraph): graph = decoding_graph else: graph = decoding_graph.graph nodes = graph.nodes() if counts: if return_samples: error_probs, samples = decoding_graph.get_error_probs( counts, method=method, return_samples=True ) else: error_probs = decoding_graph.get_error_probs(counts, method=method) else: error_probs = {} for n0, n1 in graph.edge_list(): if nodes[n0].is_logical: edge = (n1, n1) elif nodes[n1].is_logical: edge = (n0, n0) else: edge = (n0, n1) error_probs[edge] = np.nan if hasattr(self, "z_logicals"): z_logicals = set(self.z_logicals) elif hasattr(self, "z_logical"): z_logicals = {self.z_logical} else: print("No qubits for z logicals found. Proceeding without.") z_logicals = set() round_length = len(self.schedule) + 1 error_coords = {} sample_coords = {} for (n0, n1), prob in error_probs.items(): node0 = nodes[n0] node1 = nodes[n1] if n0 != n1: qubits = graph.get_edge_data(n0, n1).qubits if qubits: # error on a code qubit between rounds, or during a round assert (node0.time == node1.time and node0.qubits != node1.qubits) or ( node0.time != node1.time and node0.qubits != node1.qubits ) qubit = qubits[0] # error between rounds if node0.time == node1.time: dts = [] for node in [node0, node1]: pair = [qubit, node.properties["link qubit"]] for dt, pairs in enumerate(self.schedule): if pair in pairs or tuple(pair) in pairs: dts.append(dt) time = [max(0, node0.time - 1 + (max(dts) + 1) / round_length)] time.append(min(self.T, node0.time + min(dts) / round_length)) # error during a round else: # put nodes in descending time order if node0.time < node1.time: node_pair = [node1, node0] else: node_pair = [node0, node1] # see when in the schedule each node measures the qubit dts = [] for node in node_pair: pair = [qubit, node.properties["link qubit"]] for dt, pairs in enumerate(self.schedule): if pair in pairs or tuple(pair) in pairs: dts.append(dt) # use to define fractional time if dts[0] < dts[1]: time = [node_pair[1].time + (dts[0] + 1) / round_length] time.append(node_pair[1].time + dts[1] / round_length) else: # impossible cases get no valid time time = [] if remove_invalid_edges: graph.remove_edge(n0, n1) else: # measurement error assert node0.time != node1.time and node0.qubits == node1.qubits qubit = node0.properties["link qubit"] t0 = min(node0.time, node1.time) if abs(node0.time - node1.time) == 1: if self.resets: time = [t0, t0 + 1] else: time = [t0, t0 + (round_length - 1) / round_length] else: time = [t0 + (round_length - 1) / round_length, t0 + 1] else: # detected only by one stabilizer boundary_qubits = list(set(node0.qubits).intersection(z_logicals)) # for the case of boundary stabilizers if boundary_qubits: qubit = boundary_qubits[0] pair = [qubit, node0.properties["link qubit"]] for dt, pairs in enumerate(self.schedule): if pair in pairs or tuple(pair) in pairs: time = [max(0, node0.time - 1 + (dt + 1) / round_length)] time.append(min(self.T, node0.time + dt / round_length)) else: qubit = tuple(node0.qubits + [node0.properties["link qubit"]]) time = [node0.time, node0.time + (round_length - 1) / round_length] if time: # only record if not nan if (qubit, time[0], time[1]) not in error_coords: error_coords[qubit, time[0], time[1]] = {} sample_coords[qubit, time[0], time[1]] = {} error_coords[qubit, time[0], time[1]][n0, n1] = prob if return_samples: sample_coords[qubit, time[0], time[1]][n0, n1] = samples[n0, n1] if return_samples: return error_coords, sample_coords else: return error_coords
[docs] def clean_code(self, string): """ Given an output string of the code, obvious code qubit errors are identified and their effects are removed. Args: string (str): Output string of the code. Returns: string (str): Modifed output string of the code. """ # get the parities for the rounds and turn them into lists of integers # (also turn them the right way around) parities = [] for rstring in string.split(" ")[1:]: parities.append([int(p) for p in rstring][::-1]) parities = parities[::-1] # calculate the final parities from the final readout and add them on final = string.split(" ")[0] final_parities = [0] * self.num_qubits[1] for c0, a, c1 in self.links: final_parities[-self.link_index[a] - 1] = ( int(final[-self.code_index[c0] - 1]) + int(final[-self.code_index[c1] - 1]) ) % 2 parities.append(final_parities[::-1]) flips = {c: 0 for c in self.code_index} for rparities in parities: # see how many links around each code qubit detect a flip link_count = {c: 0 for c in self.code_index} for c0, a, c1 in self.links: # we'll need to determine whether the as yet uncorrected parity # checks from this round should be flipped, based on results # from previous rounds flip = (flips[c0] + flips[c1]) % 2 b = self.link_index[a] for c in [c0, c1]: link_count[c] += (rparities[b] + flip) % 2 # if it's all of them, assume a flip for c in link_count: if link_count[c] == self.degree[c]: flips[c] = (flips[c] + 1) % 2 # modify the parities to remove the effect for c0, a, c1 in self.links: flip = (flips[c0] + flips[c1]) % 2 b = self.link_index[a] rparities[b] = (rparities[b] + flip) % 2 # turn the results back into a string new_string = "" for rparities in parities[:-1][::-1]: new_string += " " + "".join([str(p) for p in rparities][::-1]) final_string = [int(p) for p in string.split(" ", maxsplit=1)[0]] for c, flip in flips.items(): b = self.code_index[c] final_string[-b - 1] = (final_string[-b - 1] + flip) % 2 final_string = "".join([str(p) for p in final_string]) return final_string + new_string