Source code for qtree.quant

"""Quantum circuit realization for quantum decision trees.
"""
import numpy as np
from itertools import product
from qiskit import QuantumCircuit
from qiskit.compiler import transpile
import logging

default_logger = logging.getLogger(__name__)
default_logger.setLevel(logging.WARNING)


[docs] class QTreeCircuitManager(object): """Quantum circuit manager for quantum decision trees. Parameters ---------- swap_func : callable or None Function (``qiskit.Circuit, q1, q2 -> None``) that defines how a swap gate between two qubits with indices ``q1`` and ``q2`` is realized. If set to None, ``QTreeCircuitManager.gateop_swap_default`` is used. Defaults to ``None``. mcswap_func : callable or None Function (``qiskit.Circuit, q1, q2, qc_list, mct_func -> None``) that defines how a multi-controlled swap gate between two qubits with indices ``q1`` and ``q2`` controlled by the qubits with indices ``qc_list`` is realized. Also see ``mct_func`` below. If set to None, ``QTreeCircuitManager.gateop_mcswap_u`` is used. Defaults to ``None``. mct_func : callable or None Function (``qiskit.Circuit, q1, q2, qc_list -> None``) that defines how a multi-control Toffoli gate between two qubits with indices ``q1`` and ``q2`` controlled by the qubits with indices ``qc_list`` is realized. If set to None, ``QTreeCircuitManager.gateop_mct_default`` is used. Defaults to ``None``. logger : logging.Logger or None Logger for all output purposes. If ``None``, the default logger is used. Defaults to ``None``. """ def __init__(self, swap_func=None, mcswap_func=None, mct_func=None, logger=None): self.swap_func = swap_func # gateop_swap_default, gateop_swap_cnot self.mcswap_func = mcswap_func # gateop_mcswap_u, gateop_mcswap_xmctx self.mct_func = mct_func # gateop_mct_default self.logger = logger # if self.swap_func is None: self._swap_func = QTreeCircuitManager.gateop_swap_default else: self._swap_func = self.swap_func if self.mcswap_func is None: self._mcswap_func = QTreeCircuitManager.gateop_mcswap_u else: self._mcswap_func = mcswap_func if self.mct_func is None: self._mct_func = QTreeCircuitManager.gateop_mct_default else: self._mct_func = mct_func # self.lg = self.logger if self.logger is not None else default_logger
[docs] @staticmethod def gateop_swap_default(circ, q1, q2): """SWAP function: use default swap representation. Used as argument for ``swap_func`` in ``__init__``. See there for more details. """ # swap gate assert q1 != q2 circ.swap(q1, q2)
[docs] @staticmethod def gateop_swap_cnot(circ, q1, q2): """SWAP function: use cnot swap representation. Used as argument for ``swap_func`` in ``__init__``. See there for more details. """ # swap gate: cnot decomposition assert q1 != q2 circ.cnot(q2, q1) circ.cnot(q1, q2) circ.cnot(q2, q1)
[docs] @staticmethod def gateop_mcswap_u(circ, q1, q2, qc_list, mct_func): """MCSWAP function: use custom unitary representation. Used as argument for ``mcswap_func`` in ``__init__``. See there for more details. """ # cswap or multi-cswap gate: unitary operator (mct_func unused) assert len(qc_list) > 0 and q1 != q2 and q1 not in qc_list and q2 not in qc_list affected_qubits = qc_list + [q1, q2] n_affected_qubits = len(affected_qubits) n_controls = n_affected_qubits - 2 u = np.eye(2 ** n_affected_qubits) idx1 = 2 ** n_controls * 2 - 1 idx2 = 2 ** n_controls * 3 - 1 u[idx1, idx1] = 0 u[idx2, idx2] = 0 u[idx1, idx2] = 1 u[idx2, idx1] = 1 circ.unitary(u, affected_qubits)
[docs] @staticmethod def gateop_mcswap_xmctx(circ, q1, q2, qc_list, mct_func): """MCSWAP function: use cnot mcswap representation. Used as argument for ``mcswap_func`` in ``__init__``. See there for more details. """ # cswap or multi-cswap gate: cnot + mct decomposition assert len(qc_list) > 0 and q1 != q2 and q1 not in qc_list and q2 not in qc_list circ.cnot(q2, q1) mct_func(circ, q1, q2, qc_list) circ.cnot(q2, q1)
[docs] @staticmethod def gateop_mct_default(circ, q1, q2, qc_list): """MCT function: use default mct representation. Used as argument for ``mct_func`` in ``__init__``. See there for more details. """ # multi-control toffoli assert len(qc_list) > 0 and q1 != q2 and q1 not in qc_list and q2 not in qc_list circ.mct(qc_list + [q1], q2) # controls, target; see: https://qiskit.org/documentation/locale/ja/api/qiskit.aqua.circuits.gates.mct.html
def _get_qubit_map(self, active_qubits_x, active_qubits_y): active_qubits = active_qubits_x + active_qubits_y qubit_map = {} for active_qubit in active_qubits: qubit_map[active_qubit] = len(qubit_map) return qubit_map def _find_active_qubits(self, G, Nx, Ny): # determine active qubits (neglecting state preparation) active_qubits_x = [] for depth, g in enumerate(G): n_affected_qubits = depth + 2 primary_qubit = depth for gate_idx, secondary_qubit in enumerate(g): if primary_qubit == secondary_qubit: # identity if primary_qubit not in active_qubits_x: active_qubits_x.append(primary_qubit) else: affected_qubits = list(range(n_affected_qubits - 1)) + [secondary_qubit] for qubit in affected_qubits: if qubit not in active_qubits_x: active_qubits_x.append(qubit) active_qubits_x.sort() active_qubits_y = [Nx + n for n in range(Ny)] return active_qubits_x, active_qubits_y def _swap_data(self, X, X_swap_dict): if X_swap_dict is not None and len(X_swap_dict) > 0: X_swap = X.copy() for key, value in X_swap_dict.items(): x_k = X_swap[:, key].copy() x_v = X_swap[:, value].copy() X_swap[:, value] = x_k X_swap[:, key] = x_v X = X_swap else: X = X.copy() return X def _ensure_data_probabilities(self, probabilities_dict, N, c=0): data_probabilities = {} for key in product('01', repeat=N): data_probabilities["".join(key)] = c if probabilities_dict is not None: for key, value in probabilities_dict.items(): key = key[::-1] # correct qiskit ordering data_probabilities[key] = value return data_probabilities def _to_data_probabilities(self, x): N = x.shape[1] data_probabilities = self._ensure_data_probabilities(None, N) for key in product('01', repeat=N): key = "".join(key) values = [bool(int(value)) for value in key] p = x[np.logical_and.reduce([x[:, index] == value for index, value in enumerate(values)])].shape[0] / \ x.shape[0] key = key[::-1] # correct qiskit ordering data_probabilities[key] = p return data_probabilities def _to_psi(self, data_probabilities, N): if data_probabilities is not None: psi = np.zeros(2 ** N, dtype=np.double) for psi_index, (key, p) in enumerate(data_probabilities.items()): psi[psi_index] = np.sqrt(p) else: psi = np.ones(2 ** N, dtype=np.double) psi_norm = np.sqrt(np.sum(psi * np.conj(psi))) psi /= psi_norm return psi def _load_data(self, X, y, active_qubits_x=None, active_qubits_y=None, X_swap_dict=None): Nx = X.shape[1] if active_qubits_x is not None: if len(active_qubits_x) > 0: X_swap = self._swap_data(X, X_swap_dict) X = X_swap[:, np.array(active_qubits_x)] else: X = None else: X = self._swap_data(X, X_swap_dict) if active_qubits_y is not None: if len(active_qubits_x) > 0: y = y[:, np.array(active_qubits_y) - Nx] else: y = None else: y = y.copy() if X is not None and y is not None: Xy = np.concatenate((X, y), axis=1) elif X is None and y is not None: Xy = y.copy() elif X is not None and y is None: Xy = X.copy() else: return None data_probabilities = self._to_data_probabilities(Xy) psi = self._to_psi(data_probabilities, Xy.shape[1]) return psi def _apply_gate(self, circ, depth, g, qubit_map, include_id, block_form): id_key = "C" # controlled swap nt_key = "X" # not - controlled swap - not n_affected_qubits = depth + 2 primary_qubit = depth op_code_list = ["".join(key) for key in product(nt_key + id_key, repeat=depth)] # empty defaults to swap if block_form: circ_gate = QuantumCircuit(circ.num_qubits, 0) active_circ = circ_gate else: active_circ = circ for gate_idx, secondary_qubit in enumerate(g): q1 = qubit_map[primary_qubit] # map q2 = qubit_map[secondary_qubit] # map op_code = op_code_list[gate_idx] if len(op_code) == 0: # uncontrolled swap qc_list = [] else: # multi-controlled swap qc_list = list(range(n_affected_qubits - 2)) qc_list = [qubit_map[qubit] for qubit in qc_list] # map if len(op_code) > 0: for i, c in enumerate(op_code): if c == nt_key: active_circ.x(qc_list[i]) self._mcswap(active_circ, q1, q2, qc_list, include_id) if len(op_code) > 0: for i, c in enumerate(op_code): if c == nt_key: active_circ.x(qc_list[i]) if block_form: instr = active_circ.to_instruction() instr.name = f'<{depth}|{g}>' circ.append(instr, range(circ.num_qubits), []) def _mcswap(self, circ, q1, q2, qc_list, include_id): if q1 == q2: # identity if include_id: for q in qc_list + [q1]: circ.i(q) elif len(qc_list) == 0: # uncontrolled swap assert len(qc_list) == 0 and q1 != q2 self._swap_func(circ, q1, q2) else: # single-/multi-controlled swap assert len(qc_list) > 0 and q1 != q2 and q1 not in qc_list and q2 not in qc_list qc_list.sort() self._mcswap_func(circ, q1, q2, qc_list, self._mct_func)
[docs] def build_circuit(self, G, Nx, Ny, init, measure_y=True, measure_all_x=False, remove_inactive_qubits=False, eliminate_first_swap=False, include_id=False, block_form=False, use_barriers=False): """Build circuit representation of decision tree. """ assert len(G) > 0, "Empty tree cannot be built!" Nxy = Nx + Ny # self.lg.debug("Build circuit: Start.") if measure_y: N = Nxy else: N = Nx if eliminate_first_swap: self.lg.debug("eliminate first swap") X_swap_dict = {G[0][0]: 0} # swap G[0] with 0 G[0] = [0] self.lg.debug("new G = {}".format(G)) self.lg.debug("X_swap_dict = {}".format(X_swap_dict)) else: X_swap_dict = None if remove_inactive_qubits: self.lg.debug("Build circuit: Remove inactive qubits...") active_qubits_x, active_qubits_y = self._find_active_qubits(G, Nx, Ny) if not measure_y: active_qubits_y = [] if measure_all_x: active_qubits_x = Nx self.lg.debug("active_qubits_x: {}".format(active_qubits_x)) self.lg.debug("active_qubits_y: {}".format(active_qubits_y)) all_qubits = list(range(len(active_qubits_x) + len(active_qubits_y))) qubit_map = self._get_qubit_map(active_qubits_x, active_qubits_y) else: active_qubits_x, active_qubits_y = None, None all_qubits = list(range(N)) qubit_map = {n: n for n in all_qubits} N_qubits = len(all_qubits) N_clbits = N self.lg.debug("Build circuit: Create QuantumCircuit...") circ = QuantumCircuit(N_qubits, N_clbits) if init is not None: if type(init) is tuple: X, y = init psi0 = self._load_data(X, y, active_qubits_x, active_qubits_y, X_swap_dict=X_swap_dict) elif type(init) is dict: self.lg.debug("convert from probability dict to array with appropriate qubit ordering") assert X_swap_dict is None # TODO: implement X_swap probabilities = init.copy() N_data = Nx probabilities = self._ensure_data_probabilities(probabilities, N_data) if active_qubits_x is not None: self.lg.debug("reduce probabilities") probabilities_reduced = {} for x_str, proba in probabilities.items(): x_str = x_str[::-1] # remove correct qiskit ordering x_reduced = [''] * len(active_qubits_x) for c, idx in [(c, idx) for idx, c in enumerate(x_str) if idx in active_qubits_x]: x_reduced[qubit_map[idx]] = c x_str_reduced = "".join(x_reduced) x_str_reduced = x_str_reduced[::-1] # restore correct qiskit ordering if x_str_reduced not in probabilities_reduced: probabilities_reduced[x_str_reduced] = 0 probabilities_reduced[x_str_reduced] += proba N_data = len(active_qubits_x) else: probabilities_reduced = probabilities psi0 = self._to_psi(probabilities_reduced, N_data) else: self.lg.debug( "use psi_mode as array (warning: no correction of active qubit ordering etc. is performed!)") psi0 = np.array(init) self.lg.debug("initialize with array of size {}".format(psi0.size)) assert len(psi0.shape) == 1 and psi0.size == 2 ** len(all_qubits) circ.initialize(psi0, all_qubits) # TODO: use improved intialization strategy self.lg.debug("Build circuit: Build from G...") for depth, g in enumerate(G): self.lg.debug("Build circuit: Apply g #{}/{} = {}".format(depth + 1, len(G), g)) if use_barriers: circ.barrier(all_qubits) self._apply_gate(circ, depth, g, qubit_map, include_id, block_form) self.lg.debug("Build circuit: Measurement...") current_depth = len(G) - 1 if not measure_all_x: m_list_cl_mapped = list(range(current_depth + 1)) else: m_list_cl_mapped = list(range(Nx)) if measure_y: m_list_cl_mapped += list(range(Nx, Nxy)) m_list_q_mapped = [qubit_map[qubit] for qubit in m_list_cl_mapped] # map if use_barriers: circ.barrier(all_qubits) circ.measure(m_list_q_mapped, m_list_cl_mapped) meta = (m_list_q_mapped, m_list_cl_mapped) return circ, meta
[docs] def run_circuits(self, circ_list, quantum_backend): """Evaluate a list of circuits and return the measured counts. """ for circ in circ_list: circ_ops = dict(circ.count_ops()) self.lg.debug("*\tcircuit: {}".format(circ_ops)) circ_list = transpile(circ_list, backend=quantum_backend) result = quantum_backend.run(circ_list).result() counts_list = [] for circ_idx, circ in enumerate(circ_list): counts = result.get_counts(circ) counts_list.append(counts) return counts_list