Source code for bqskit.passes.synthesis.qpredict

"""This module implements the QPredictDecompositionPass class."""
from __future__ import annotations

import logging
from typing import Any
from typing import Sequence

import numpy as np

from bqskit.compiler.passdata import PassData
from bqskit.ir.circuit import Circuit
from bqskit.ir.gates.parameterized.unitary import VariableUnitaryGate
from bqskit.ir.location import CircuitLocation
from bqskit.ir.opt.cost import CostFunctionGenerator
from bqskit.ir.opt.cost import HilbertSchmidtResidualsGenerator
from bqskit.passes.synthesis.synthesis import SynthesisPass
from bqskit.qis.state.state import StateVector
from bqskit.qis.state.system import StateSystem
from bqskit.qis.unitary.unitarymatrix import UnitaryMatrix
from bqskit.utils.math import pauli_expansion
from bqskit.utils.math import unitary_log_no_i
from bqskit.utils.typing import is_integer
from bqskit.utils.typing import is_real_number

_logger = logging.getLogger(__name__)


[docs] class QPredictDecompositionPass(SynthesisPass): """ The QPredictDecompositionPass class. Breaks down the input unitary into blocks. """
[docs] def __init__( self, block_size_start: int = 2, block_size_limit: int | None = None, fail_limit: int = 3, success_threshold: float = 1e-8, progress_threshold_r: float = 5e-2, progress_threshold_a: float = 1e-8, cost: CostFunctionGenerator = HilbertSchmidtResidualsGenerator(), max_depth: int | None = None, instantiate_options: dict[str, Any] = {}, ) -> None: """ QPredictDecompositionPass Constructor. Args: block_size_start (int): The smallest block size to append each step. (Default: 2) block_size_limit (int | None): The largest block size to append each step. If left as None, the unitary being synthesized provides the limit. (Default: None) fail_limit (int): The amount of tries to make progress before increasing block size. (Default: 2) success_threshold (float): The distance threshold that determines successful termintation. Measured in cost described by the cost function. (Default: 1e-6) progress_threshold (float): The distance necessary to improve for the synthesis algorithm to complete a layer and move on. Lowering this will led to synthesis going deeper quicker, and raising it will force to algorithm to spend more time on each layer. Caution, changing this too much might break the synthesis algorithm. (Default: 5e-3) cost (CostFunction | None): The cost function that determines distance during synthesis. The goal of this synthesis pass is to implement circuits for the given unitaries that have a cost less than the `success_threshold`. (Default: HSDistance()) max_depth (int): The maximum number of gates to append without success before termination. If left as None it will default to unlimited. (Default: None) instantiate_options (dict[str: Any]): Options passed directly to circuit.instantiate when instantiating circuit templates. (Default: {}) Raises: ValueError: If `block_size_start` is nonpositive. ValueError: If `block_size_limit` is less than `block_size_start`. ValueError: If `fail_limit` is nonpositive. ValueError: If `max_depth` is nonpositive. """ if not is_integer(block_size_start): raise TypeError( 'Expected block_size_start to be an integer, got %s' % type(block_size_start), ) if block_size_start <= 0: raise ValueError( 'Expected block_size_start to be positive, got %d.' % block_size_start, ) if block_size_limit is not None and not is_integer(block_size_limit): raise TypeError( 'Expected block_size_limit to be an integer, got %s' % type(block_size_limit), ) if block_size_limit is not None and block_size_limit < block_size_start: raise ValueError( 'Expected block_size_limit to be larger than block_size_start,' 'got %d.' % block_size_limit, ) if not is_integer(fail_limit): raise TypeError( 'Expected fail_limit to be an integer, got %s' % type(fail_limit), ) if fail_limit <= 0: raise ValueError( 'Expected fail_limit to be positive, got %d.' % fail_limit, ) if not is_real_number(success_threshold): raise TypeError( 'Expected real number for success_threshold' ', got %s' % type(success_threshold), ) if not is_real_number(progress_threshold_r): raise TypeError( 'Expected real number for progress_threshold_r' ', got %s' % type(progress_threshold_r), ) if not is_real_number(progress_threshold_a): raise TypeError( 'Expected real number for progress_threshold_a' ', got %s' % type(progress_threshold_a), ) if not isinstance(cost, CostFunctionGenerator): raise TypeError( 'Expected cost to be a CostFunctionGenerator, got %s' % type(cost), ) if max_depth is not None and not is_integer(max_depth): raise TypeError( 'Expected max_depth to be an integer, got %s' % type(max_depth), ) if max_depth is not None and max_depth <= 0: raise ValueError( 'Expected max_depth to be positive, got %d.' % int(max_depth), ) if not isinstance(instantiate_options, dict): raise TypeError( 'Expected dictionary for instantiate_options, got %s.' % type(instantiate_options), ) self.block_size_start = block_size_start self.block_size_limit = block_size_limit self.fail_limit = fail_limit self.success_threshold = success_threshold self.progress_threshold_r = progress_threshold_r self.progress_threshold_a = progress_threshold_a self.cost = cost self.max_depth = max_depth self.instantiate_options = { 'min_iters': 25, 'diff_tol_r': 1e-4, } self.instantiate_options.update(instantiate_options)
[docs] async def synthesize( self, utry: UnitaryMatrix | StateVector | StateSystem, data: PassData, ) -> Circuit: """Synthesize `utry`, see :class:`SynthesisPass` for more.""" if isinstance(utry, (StateVector, StateSystem)): raise RuntimeError('Unable to synthesize a state with qpredict.') instantiate_options = self.instantiate_options.copy() if data.seed is not None: instantiate_options['seed'] = data.seed # 0. Skip any unitaries too small for the configured block. if self.block_size_start >= utry.num_qudits: _logger.warning( 'Skipping synthesis: block size is too large.', ) return Circuit.from_unitary(utry) # 1. Create empty circuit with same size and radixes as `utry`. circuit = Circuit(utry.num_qudits, utry.radixes) # 2. Calculate block sizes block_size_end = utry.num_qudits - 1 if self.block_size_limit is not None: block_size_end = self.block_size_limit # 3. Calculate relevant coupling_graphs model = self.get_model(utry, data) locations = [ model.get_locations(i) for i in range(self.block_size_start, block_size_end + 1) ] # 3. Bottom-up synthesis: build circuit up one gate at a time layer = 1 last_cost = 1.0 last_loc = None while True: remainder = utry.dagger @ circuit.get_unitary() sorted_locations = self.analyze_remainder(remainder, locations) for loc in sorted_locations: # Never predict the previous location if loc == last_loc: continue _logger.info(f'Trying next predicted location {loc}.') circuit.append_gate(VariableUnitaryGate(len(loc)), loc) circuit.instantiate( utry, **instantiate_options, # type: ignore ) cost = self.cost.calc_cost(circuit, utry) _logger.info(f'Instantiated; layers: {layer}, cost: {cost:e}.') if cost < self.success_threshold: _logger.info(f'Circuit found with cost: {cost:e}.') _logger.info('Successful synthesis.') return circuit progress_threshold = self.progress_threshold_a progress_threshold += self.progress_threshold_r * np.abs(cost) if last_cost - cost >= progress_threshold: _logger.info('Progress has been made, depth increasing.') last_loc = loc last_cost = cost layer += 1 break _logger.info('Progress has not been made.') circuit.pop((-1, loc[0]))
[docs] def analyze_remainder( self, R: UnitaryMatrix, locations: Sequence[Sequence[CircuitLocation]], ) -> list[CircuitLocation]: """ Perform remainder analysis on `R` to sort `locations`. Args: R (UnitaryMatrix): The remainder to analyze. locations (Sequence[Sequence[CircuitLocation]]): List of locations grouped by block size. Returns: (list[CircuitLocation]): Sorted list of locations for next block based on remainder analysis. """ _logger.info('Performing remainder analysis.') pauli_coefs = pauli_expansion(unitary_log_no_i(R)) # type: ignore locations_by_index: dict[int, set[CircuitLocation]] = {} for location_group in locations: for loc in location_group: for qudit_index in loc: if qudit_index not in locations_by_index: locations_by_index[qudit_index] = set() locations_by_index[qudit_index].add(loc) location_scores_by_size: dict[int, dict[CircuitLocation, float]] = {} for location_group in locations: for loc in location_group: if len(loc) not in location_scores_by_size: location_scores_by_size[len(loc)] = {} location_scores_by_size[len(loc)][loc] = 0.0 for coef_idx, coef in enumerate(pauli_coefs): for qudit_index in self.decode_qubits(coef_idx): for loc in locations_by_index[qudit_index]: location_scores_by_size[len(loc)][loc] += np.abs(coef) sorted_locations_by_size: dict[int, list[CircuitLocation]] = { size: sorted( list(location_scores.keys()), key=lambda x: location_scores[x], reverse=True, ) for size, location_scores in location_scores_by_size.items() } sorted_locations_length: dict[int, int] = { size: len(sorted_locations) for size, sorted_locations in sorted_locations_by_size.items() } sorted_locations_by_sorted_size = sorted( sorted_locations_by_size.items(), key=lambda x: x[0], ) sorted_locations: list[CircuitLocation] = [] for size, locations in sorted_locations_by_sorted_size: for i in range(self.fail_limit): if i < sorted_locations_length[size]: sorted_locations.append(locations[i]) sorted_locations.extend(sorted_locations_by_sorted_size[-1][1]) _logger.debug('Found order of locations:') _logger.debug(sorted_locations) return sorted_locations
[docs] def decode_qubits(self, pauli_coef_index: int) -> list[int]: """Decode `pauli_coef_index` into qubit indices.""" qubit_indices = [] qudit_index = 0 while pauli_coef_index > 0: if ((pauli_coef_index & 3) >> 1) | (pauli_coef_index & 1): qubit_indices.append(qudit_index) pauli_coef_index >>= 2 qudit_index += 1 return qubit_indices