Source code for bqskit.qis.graph

"""This module implements the CouplingGraph class."""
from __future__ import annotations

import copy
import itertools as it
import logging
from random import shuffle
from typing import Any
from typing import cast
from typing import Collection
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Tuple
from typing import TYPE_CHECKING
from typing import Union

import numpy as np

if TYPE_CHECKING:
    from typing_extensions import TypeGuard

from bqskit.ir.location import CircuitLocation
from bqskit.ir.location import CircuitLocationLike
from bqskit.utils.typing import is_integer
from bqskit.utils.typing import is_iterable

_logger = logging.getLogger(__name__)


[docs] class CouplingGraph(Collection[Tuple[int, int]]): """A graph representing connections in a qudit topology."""
[docs] def __init__( self, graph: Iterable[tuple[int, int]], num_qudits: int | None = None, ) -> None: if isinstance(graph, CouplingGraph): self.num_qudits: int = graph.num_qudits self._edges: set[tuple[int, int]] = graph._edges self._adj: list[set[int]] = graph._adj return if not CouplingGraph.is_valid_coupling_graph(graph): raise TypeError('Invalid coupling graph.') self._edges = {g if g[0] <= g[1] else (g[1], g[0]) for g in graph} calced_num_qudits = 0 for q1, q2 in self._edges: calced_num_qudits = max(calced_num_qudits, max(q1, q2)) calced_num_qudits += 1 if num_qudits is None: self.num_qudits = calced_num_qudits elif calced_num_qudits > num_qudits: raise ValueError('Edges between invalid qudits.') else: self.num_qudits = num_qudits self._adj = [set() for _ in range(self.num_qudits)] for q1, q2 in self._edges: self._adj[q1].add(q2) self._adj[q2].add(q1) self._mat = [ [np.inf for _ in range(self.num_qudits)] for _ in range(self.num_qudits) ] for q1, q2 in self._edges: self._mat[q1][q2] = 1 self._mat[q2][q1] = 1
[docs] def is_fully_connected(self) -> bool: """Return true if the graph is fully connected.""" frontier: set[int] = {0} qudits_seen: set[int] = set() while len(frontier) > 0: expanded_qudits = set() for qudit in frontier: expanded_qudits.add(qudit) expanded_qudits.update(self._adj[qudit]) frontier = expanded_qudits - qudits_seen qudits_seen.update(expanded_qudits) if len(qudits_seen) == self.num_qudits: return True return False
[docs] def get_neighbors_of(self, qudit: int) -> list[int]: """Return the qudits adjacent to `qudit`.""" return list(self._adj[qudit])
def __contains__(self, __o: object) -> bool: return self._edges.__contains__(__o) def __eq__(self, __o: object) -> bool: if not isinstance(__o, CouplingGraph): return False if self.num_qudits != __o.num_qudits: return False if self._mat != __o._mat: return False return True def __iter__(self) -> Iterator[tuple[int, int]]: return self._edges.__iter__() def __hash__(self) -> int: return hash((self.num_qudits, tuple(self._edges))) def __len__(self) -> int: return self._edges.__len__() def __str__(self) -> str: return 'CouplingGraph(' + self._edges.__str__() + ')' def __repr__(self) -> str: return self._edges.__repr__()
[docs] def get_qudit_degrees(self) -> list[int]: return [len(l) for l in self._adj]
[docs] def all_pairs_shortest_path(self) -> list[list[int]]: """ Calculate all pairs shortest path matrix using Floyd-Warshall. Returns: D (list[list[int]]): D[i][j] is the length of the shortest path from i to j. """ D = copy.deepcopy(self._mat) for k in range(self.num_qudits): for i in range(self.num_qudits): for j in range(self.num_qudits): D[i][j] = min(D[i][j], D[i][k] + D[k][j]) return cast(List[List[int]], D)
[docs] def get_shortest_path_tree(self, source: int) -> list[tuple[int, ...]]: """Return shortest path from `source` to every node in `self`.""" # Dijkstra's algorithm to build shortest-path tree unvisited_qudits = set(range(self.num_qudits)) distances = {i: np.inf for i in range(self.num_qudits)} paths: list[tuple[int, ...]] = [tuple() for i in range(self.num_qudits)] distances[source] = 0 paths[source] = (source,) while len(unvisited_qudits) > 0: # Pick next unvisited qudit with shortest distance unvisited_distances = [ (x, y) for x, y in distances.items() if x in unvisited_qudits ] unvisited_distances.sort(key=lambda x: x[1]) current_qudit = unvisited_distances[0][0] if distances[current_qudit] == np.inf: raise RuntimeError(f'No path found to qudit: {current_qudit}.') neighbors = self.get_neighbors_of(current_qudit) unvisted_neighbors = unvisited_qudits.intersection(neighbors) for other_qudit in unvisted_neighbors: if distances[current_qudit] + 1 < distances[other_qudit]: distances[other_qudit] = distances[current_qudit] + 1 paths[other_qudit] = paths[current_qudit] + (other_qudit,) unvisited_qudits.remove(current_qudit) return [paths[i] for i in range(self.num_qudits)]
[docs] def get_subgraph( self, location: CircuitLocationLike, renumbering: dict[int, int] | None = None, ) -> CouplingGraph: """Returns the sub-coupling-graph with qudits in `location`.""" if not CircuitLocation.is_location(location, self.num_qudits): raise TypeError('Invalid location.') location = CircuitLocation(location) if renumbering is None: renumbering = {q: i for i, q in enumerate(location)} subgraph = [] location_set = {loc for loc in location} for q_i in location: for q_i_neighbor in location_set.intersection(self._adj[q_i]): subgraph.append((renumbering[q_i], renumbering[q_i_neighbor])) return CouplingGraph(subgraph, len(location))
[docs] def get_subgraphs_of_size(self, size: int) -> list[CircuitLocation]: """ Find all sets of indices that form connected subgraphs on their own. Each location describes a valid spot for a `size`-sized gate, so the number of qudit_indices in each location is `size`. A location is only included if each pair of qudits is directly connected or connected through other qudits in the location. Args: size (int): The size of each location in the final list. Returns: list[CircuitLocation]: The locations compliant with the machine. Raises: ValueError: If `size` is nonpositive or too large. Notes: Does a breadth first search on all pairs of qudits, keeps paths that have length equal to size. Note that the coupling map is assumed to be undirected. """ if not is_integer(size): raise TypeError( f'Expected integer for block_size, got {type(size)}', ) if size > self.num_qudits: raise ValueError( 'The block_size is too large; ' f'graph size is {self.num_qudits}, got {size}.', ) if size <= 0: raise ValueError(f'Nonpositive size; got {size}.') locations: set[CircuitLocation] = set() for qudit in range(self.num_qudits): # Get every valid set containing `qudit` with size == size self._location_search(locations, set(), qudit, size) return list(locations)
def _location_search( self, locations: set[CircuitLocation], path: set[int], vertex: int, limit: int, ) -> None: """ Add paths with length equal to limit to the `locations` set. Args: locations (set[CircuitLocation]): A list that contains all paths found so far of length equal to `limit`. path (set[int]): The qudit vertices currently included in the path. vertex (int): The vertex in the graph currently being examined. limit (int): The desired length of paths in the `locations` list. """ if vertex in path: return curr_path = path.copy() curr_path.add(vertex) if len(curr_path) == limit: locations.add(CircuitLocation(list(curr_path))) return frontier: set[int] = { qudit for node in curr_path for qudit in self._adj[node] if qudit not in curr_path } for neighbor in frontier: self._location_search(locations, curr_path, neighbor, limit)
[docs] @staticmethod def is_valid_coupling_graph( coupling_graph: Any, num_qudits: int | None = None, ) -> TypeGuard[CouplingGraphLike]: """ Return true if the coupling graph is valid. Args: coupling_graph (Any): The coupling graph to check. num_qudits (int | None): The total number of qudits. All qudits should be less than this. If None, don't check. Returns: bool: Valid or not """ if not is_iterable(coupling_graph): _logger.debug('Coupling graph is not iterable.') return False if len(list(coupling_graph)) == 0: return True if not all(isinstance(pair, tuple) for pair in coupling_graph): _logger.debug('Coupling graph is not a sequence of tuples.') return False if not all([len(pair) == 2 for pair in coupling_graph]): _logger.debug('Coupling graph is not a sequence of pairs.') return False if num_qudits is not None: if not (is_integer(num_qudits) and num_qudits > 0): _logger.debug('Invalid num_qudits in coupling graph check.') return False if not all( qudit < num_qudits for pair in coupling_graph for qudit in pair ): _logger.debug('Coupling graph has invalid qudits.') return False if not all([ len(pair) == len(set(pair)) for pair in coupling_graph ]): _logger.debug('Coupling graph has an invalid pair.') return False return True
[docs] @staticmethod def all_to_all(num_qudits: int) -> CouplingGraph: """Return a coupling graph with all qudits connected.""" return CouplingGraph(set(it.combinations(range(num_qudits), 2)))
[docs] @staticmethod def linear(num_qudits: int) -> CouplingGraph: """Return a coupling graph with nearest-neighbor connectivity.""" return CouplingGraph([(x, x + 1) for x in range(num_qudits - 1)])
[docs] @staticmethod def ring(num_qudits: int) -> CouplingGraph: """Return a coupling graph with ring connectivity.""" ext = [(0, num_qudits - 1)] return CouplingGraph([(x, x + 1) for x in range(num_qudits - 1)] + ext)
[docs] @staticmethod def star(num_qudits: int) -> CouplingGraph: """Return a coupling graph with one qudit connected to all else.""" return CouplingGraph([(0, x) for x in range(1, num_qudits)])
[docs] @staticmethod def grid(num_rows: int, num_cols: int) -> CouplingGraph: """Return a coupling graph with a grid of qubits.""" num_qudits = num_rows * num_cols edges = set() for i in range(num_qudits): if i % num_cols != num_cols - 1: edges.add((i, i + 1)) if i < (num_rows - 1) * num_cols: edges.add((i, i + num_cols)) return CouplingGraph(edges)
[docs] def maximal_matching( self, edges_to_ignore: list[tuple[int, int]] = [], randomize: bool = False, ) -> list[tuple[int, int]]: """ Generate a random graph matching for the coupling graph. Edges in the `ignored_edges` list will not be included. Arguments: edges_to_ignore (list[tuple[int]]): Edges not included in the matching. (Default: []) randomize (bool): Shuffle edges to create random matchings. (Default: False) Returns: matching (list[tuple[int]]): A maximal list of edges that share no common verticies. """ matching: set[tuple[int, int]] = set() vertices: set[int] = set() edge_list = [ (u, v) for (u, v) in self._edges if (u, v) not in edges_to_ignore and (v, u) not in edges_to_ignore ] if randomize: shuffle(edge_list) for edge in edge_list: u, v = edge if u not in vertices and v not in vertices and u != v: matching.add(edge) vertices.update(edge) return list(matching)
[docs] def is_fully_connected_without(self, qudit: int) -> bool: """Return true if the graph is fully connected without `qudit`.""" frontier = {0} if qudit != 0 else {1} qudits_seen = {0} if qudit != 0 else {1} while len(frontier) > 0: expanded_qudits = set() for q in frontier: if q != qudit: neighbors = set(self.get_neighbors_of(q)) - {qudit} expanded_qudits.update(neighbors) frontier = expanded_qudits - qudits_seen qudits_seen.update(expanded_qudits) if len(qudits_seen) == self.num_qudits - 1: return True return False
[docs] def get_rooted_minimum_span(self, root: int) -> list[tuple[int, int]]: """ Connect `root` to every other node in the graph. Args: root (int): The qudit to start from. Return: (list[tuple[int, int]]): A list of minimal edges that connects 'root' to every other qudit. """ # Construct Minimum Spanning Tree using breadth-first search mst = [] seen = {root} frontier = [root] while len(frontier) > 0 and len(seen) < self.num_qudits: qudit = frontier.pop(0) neighbors = set(self.get_neighbors_of(qudit)) unseen_neighbors = neighbors - seen frontier.extend(unseen_neighbors) seen.update(neighbors) for neighbor in unseen_neighbors: mst.append((qudit, neighbor)) mst = CouplingGraph(mst) # Traverse MST depth-first to produce pairs of interacting qudits interactions: list[tuple[int, int]] = [] frontier: list[tuple[int, tuple[int, int] | None]] = [(root, None)] while len(frontier) > 0: qudit, interaction = frontier.pop(0) if interaction is not None: interactions.append(interaction) for neighbor in mst.get_neighbors_of(qudit): if interaction is None or neighbor != interaction[0]: frontier.insert(0, (neighbor, (qudit, neighbor))) return interactions
[docs] def get_induced_subgraph( self, location: CircuitLocationLike, ) -> list[tuple[int, int]]: """ Return the edges induced by the vertices specified in `location`. Arguments: location (CircuitLocationLike): A list of qubits or vertices in the CouplingGraph. The edges connecting each of these vertices together will be extracted. Returns: (list[tuple[int,int]]): The list of edges connecting vertices in `location`. """ if not isinstance(location, CircuitLocation): location = CircuitLocation(location) if len(location) < 2: raise ValueError('Invalid location size.') subgraph = [] for q1, q2 in it.combinations(location, 2): if (q1, q2) in self._edges or (q2, q1) in self._edges: subgraph.append((min([q1, q2]), max([q1, q2]))) return subgraph
[docs] def relabel_subgraph( graph: CouplingGraphLike, relabeling: dict[int, int] | None = None, ) -> CouplingGraph: """ Renumber the vertices in `graph` according to the optionally provided `relabeling` dictionary, or relabel so that the vertices are in renumbered in least to greatest order and in the set {0,...,|V|-1}. Arguments: graph (CouplingGraphLike): A collection of edges representing a graph. relabeling (dict[int,int]|None): An optional dictionary specifying current labels as keys and desired labels as values. (Default: None) Returns: (CouplingGraph): The relabeled CouplingGraph. """ if relabeling is None: vertices = set() for q1, q2 in graph: vertices.add(q1) vertices.add(q2) renum = {q: i for i, q in enumerate(vertices)} else: renum = relabeling edges = [ (min([renum[q1], renum[q2]]), max([renum[q1], renum[q2]])) for q1, q2 in graph ] return CouplingGraph(edges)
[docs] def is_embedded_in(self, graph: CouplingGraph) -> bool: """ Check if this CouplingGraph is embedded within `graph`. Arguments: graph (CouplingGraph): A graph that may have a subgraph isomorphic to self. Returns: (bool): True if this graph is isomorphic to a subgraph of `graph`, and False if not. Note: The algorithm used is naive and relies on the fact that `graph` and `self` are similar in size and sparse. Todo: Implement the V2 algorithm for subgraph isomorphism. """ if type(graph) is not CouplingGraph: raise ValueError('Provided `graph` is not of type CouplingGraph.') # A larger graph cannot be embedded in a smaller graph if self.num_qudits > graph.num_qudits: return False degs = {q: len(self._adj[q]) for q in range(self.num_qudits)} other_degs = {q: len(graph._adj[q]) for q in range(graph.num_qudits)} # Candidates are vertices in the other graph that have degree >= a # given vertex in self. candidate_labels: dict[int, list[int]] = { q: [] for q in range(self.num_qudits) } for q1 in range(self.num_qudits): for q2 in range(graph.num_qudits): if degs[q1] <= other_degs[q2]: candidate_labels[q1].append(q2) # Each vertex must have at least one candidate vertex if any([len(cands) == 0 for cands in candidate_labels.values()]): return False # Check if the current renumbering works for renumbering in it.permutations( range(graph.num_qudits), self.num_qudits, ): renum = {q: renumbering[q] for q in range(self.num_qudits)} if all([ (min([renum[u], renum[v]]), max([renum[u], renum[v]])) in graph for u, v in self._edges ]): return True return False
CouplingGraphLike = Union[Iterable[Tuple[int, int]], CouplingGraph]