Source code for grakel.kernels.multiscale_laplacian

"""Multiscale Laplacian Graph Kernel as defined in :cite:`kondor2016multiscale`."""
# Author: Ioannis Siglidis <y.siglidis@gmail.com>
# License: BSD 3 clause
# Python 2/3 cross-compatibility import
from __future__ import print_function

import collections
import warnings
import numpy as np
import time

from numbers import Real
from math import exp

from sklearn.utils import check_random_state

from numpy.linalg import eig
from numpy.linalg import inv
from numpy.linalg import multi_dot
from numpy.linalg import eigvals

from grakel.graph import Graph
from scipy.sparse.csgraph import laplacian

from grakel.kernels import Kernel

# Python 2/3 cross-compatibility import
from six import iteritems

positive_eigenvalue_limit = float("+1e-6")


[docs]class MultiscaleLaplacianFast(Kernel): """Laplacian Graph Kernel as proposed in :cite:`kondor2016multiscale`. Parameters ---------- random_state : RandomState or int, default=None A random number generator instance or an int to initialize a RandomState as a seed. L : int, default=3 The number of neighborhoods. gamma : Real, default=0.01 A smoothing parameter of float value. heta : float, default=0.01 A smoothing parameter of float value. P : int, default=10 Restrict the maximum number of eigenvalues, taken on eigenvalue decomposition. n_samples : int, default=50 The number of vertex samples. Attributes ---------- random_state_ : RandomState A RandomState object handling all randomness of the class. _data_level : dict A dictionary containing the feature basis information needed for each level calculation on transform. """ _graph_format = "adjacency"
[docs] def __init__(self, n_jobs=None, normalize=False, verbose=False, random_state=None, L=3, P=10, gamma=0.01, heta=0.01, n_samples=50): """Initialise a `multiscale_laplacian` kernel.""" super(MultiscaleLaplacianFast, self).__init__( n_jobs=n_jobs, normalize=normalize, verbose=verbose) self.random_state = random_state self.gamma = gamma self.heta = heta self.L = L self.P = P self.n_samples = n_samples self._initialized.update({"random_state": False, "gamma": False, "heta": False, "L": False, "n_samples": False, "P": False})
[docs] def initialize(self): """Initialize all transformer arguments, needing initialization.""" super(MultiscaleLaplacianFast, self).initialize() if not self._initialized["random_state"]: self.random_state_ = check_random_state(self.random_state) self._initialized["random_state"] = True if not self._initialized["gamma"]: if not isinstance(self.gamma, Real): raise TypeError('gamma must be a real number') elif self.gamma == .0: warnings.warn('with zero gamma the calculation may crash') elif self.gamma < 0: raise TypeError('gamma must be positive') self._initialized["gamma"] = True if not self._initialized["heta"]: if not isinstance(self.heta, Real): raise TypeError('heta must be a real number') elif self.heta == .0: warnings.warn('with zero heta the calculation may crash') elif self.heta < 0: raise TypeError('heta must be positive') self._initialized["heta"] = True if not self._initialized["L"]: if type(self.L) is not int: raise TypeError('L must be an integer') elif self.L < 0: raise TypeError('L must be positive') self._initialized["L"] = True if not self._initialized["n_samples"]: if type(self.n_samples) is not int or self.n_samples <= 0: raise TypeError('n_samples must be a positive integer') self._initialized["n_samples"] = True if not self._initialized["P"]: if type(self.P) is not int or self.P <= 0: raise TypeError('P must be a positive integer') self._initialized["P"] = True
[docs] def parse_input(self, X): """Fast ML Graph Kernel. See supplementary material :cite:`kondor2016multiscale`, algorithm 1. Parameters ---------- X : iterable For the input to pass the test, we must have: Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that correspond to the given graph format). A valid input also consists of graph type objects. Returns ------- out : list A list of tuples with S matrices inverses and their 4th-root determinants. """ if not isinstance(X, collections.Iterable): raise TypeError('input must be an iterable\n') else: ng = 0 out = list() data = dict() neighborhoods = dict() for (idx, x) in enumerate(iter(X)): is_iter = False if isinstance(x, collections.Iterable): is_iter, x = True, list(x) if is_iter and len(x) in [0, 2, 3]: if len(x) == 0: warnings.warn('Ignoring empty element ' + 'on index: '+str(idx)) continue else: x = Graph(x[0], x[1], {}, self._graph_format) elif type(x) is not Graph: x.desired_format(self._graph_format) else: raise TypeError('each element of X must be either a ' 'graph or an iterable with at least 1 ' 'and at most 3 elements\n') phi_d = x.get_labels() A = x.get_adjacency_matrix() try: phi = np.array([list(phi_d[i]) for i in range(A.shape[0])]) except TypeError: raise TypeError('Features must be iterable and castable ' 'in total to a numpy array.') Lap = laplacian(A).astype(float) _increment_diagonal_(Lap, self.heta) data[ng] = {0: A, 1: phi, 2: inv(Lap)} neighborhoods[ng] = x ng += 1 if ng == 0: raise ValueError('parsed input is empty') # Define a function for calculating the S's of subgraphs of each iteration def calculate_C(k, j, l): if type(neighborhoods[k]) is Graph: neighborhoods[k] = neighborhoods[k].produce_neighborhoods( r=self.L, sort_neighbors=False) indexes = neighborhoods[k][l][j] L = laplacian(data[k][0][indexes, :][:, indexes]).astype(float) _increment_diagonal_(L, self.heta) U = data[k][1][indexes, :] S = multi_dot((U.T, inv(L), U)) _increment_diagonal_(S, self.gamma) return (inv(S), np.sum(np.log(np.real(eigvals(S))))) if self._method_calling == 1: V = [(k, j) for k in range(ng) for j in range(data[k][0].shape[0])] ns = min(len(V), self.n_samples) self.random_state_.shuffle(V) vs = V[:ns] phi_k = np.array([data[k][1][j, :] for (k, j) in vs]) # w the eigen vectors, v the eigenvalues K = phi_k.dot(phi_k.T) # Calculate eigenvalues v, w = eig(K) v, w = np.real(v), np.real(w.T) # keep only the positive vpos = np.argpartition(v, -self.P)[-self.P:] vpos = vpos[np.where(v[vpos] > positive_eigenvalue_limit)] # ksi.shape = (k, Ns) * (Ns, P) ksi = w[vpos].dot(phi_k).T / np.sqrt(v[vpos]) for j in range(ng): # (n_samples, k) * (k, P) data[j][1] = data[j][1].dot(ksi) self._data_level = {0: ksi} for l in range(1, self.L+1): # Take random samples from all the vertices of all graphs self.random_state_.shuffle(V) vs = V[:ns] # Compute the reference subsampled Gram matrix K_proj = {k: np.zeros(shape=(data[k][0].shape[0], ns)) for k in range(ng)} K, C = np.zeros(shape=(len(vs), len(vs))), dict() for (m, (k, j)) in enumerate(vs): C[m] = calculate_C(k, j, l) K_proj[k][j, m] = K[m, m] = self.pairwise_operation(C[m], C[m]) for (s, (k2, j2)) in enumerate(vs): if s < m: K[s, m] = K[m, s] \ = K_proj[k2][j2, m] \ = K_proj[k][j, s] \ = self.pairwise_operation(C[s], C[m]) else: break # Compute the kernels of the relations of the reference to everything else for (k, j) in V[ns:]: for (m, _) in enumerate(vs): K_proj[k][j, m] = self.pairwise_operation(C[m], calculate_C(k, j, l)) # w the eigen vectors, v the eigenvalues v, w = eig(K) v, w = np.real(v), np.real(w.T) # keep only the positive vpos = np.argpartition(v, -self.P)[-self.P:] vpos = vpos[np.where(v[vpos] > positive_eigenvalue_limit)] # Q shape=(k, P) Q = w[vpos].T / np.sqrt(v[vpos]) for j in range(ng): # (n, ns) * (ns, P) data[j][1] = K_proj[j].dot(Q) self._data_level[l] = (C, Q) elif self._method_calling == 3: ksi = self._data_level[0] for j in range(ng): # (n, k) * (k, P) data[j][1] = data[j][1].dot(ksi) for l in range(1, self.L+1): C, Q = self._data_level[l] for j in range(ng): K_proj = np.zeros(shape=(data[j][0].shape[0], len(C))) for n in range(data[j][0].shape[0]): for m in range(len(C)): K_proj[n, m] = self.pairwise_operation(C[m], calculate_C(j, n, l)) data[j][1] = K_proj.dot(Q) # Apply the final calculation of S. for k in range(ng): S = multi_dot((data[k][1].T, data[k][2], data[k][1])) _increment_diagonal_(S, self.gamma) out.append((inv(S), np.sum(np.log(np.real(eigvals(S)))))) return out
[docs] def pairwise_operation(self, x, y): """FLG calculation for the fast multiscale laplacian. Parameters ---------- x, y : tuple An np.array of inverse and the log determinant of S (for the calculation of S matrices see the algorithm 1 of the supplement material in cite:`kondor2016multiscale`). Returns ------- kernel : number The FLG core kernel value. """ S_inv_x, log_det_x = x S_inv_y, log_det_y = y # Calculate the result in term of logs log_detS = -np.sum(np.log(np.real(eigvals(S_inv_x + S_inv_y)))) logr = (log_detS - 0.5*(log_det_x + log_det_y))/2.0 if logr < -30: return .0 else: return exp(logr)
[docs]class MultiscaleLaplacian(Kernel): """Laplacian Graph Kernel as proposed in :cite:`kondor2016multiscale`. Parameters ---------- L : int, default=3 The number of neighborhoods. gamma : Real, default=0.01 A small softening parameter of float value. heta : float, default=0.01 A smoothing parameter of float value. Attributes ---------- L : int The number of neighborhoods. gamma : Real A smoothing parameter for calculation of S matrices. heta : float A smoothing parameter for calculation of S matrices. """ _graph_format = "adjacency"
[docs] def __init__(self, n_jobs=None, normalize=False, verbose=False, L=3, gamma=0.01, heta=0.01): """Initialise a `multiscale_laplacian` kernel.""" super(MultiscaleLaplacian, self).__init__(n_jobs=n_jobs, normalize=normalize, verbose=verbose) self.gamma = gamma self.heta = heta self.L = L self._initialized.update({"gamma": False, "heta": False, "L": False})
[docs] def initialize(self): """Initialize all transformer arguments, needing initialization.""" super(MultiscaleLaplacian, self).initialize() if not self._initialized["gamma"]: if not isinstance(self.gamma, Real): raise TypeError('gamma must be a real number') elif self.gamma == .0: warnings.warn('with zero gamma the calculation may crash') elif self.gamma < 0: raise TypeError('gamma must be a positive integer') self._initialized["gamma"] = True if not self._initialized["heta"]: if not isinstance(self.heta, Real): raise TypeError('heta must be a real number') if self.heta == .0: warnings.warn('with zero heta the calculation may crash') elif self.heta < 0: raise TypeError('heta must be positive') self._initialized["heta"] = True if not self._initialized["L"]: if type(self.L) is not int: raise TypeError('L must be an integer') elif self.L < 0: raise TypeError('L must be positive') self._initialized["L"] = True
[docs] def parse_input(self, X): """Parse and create features for multiscale_laplacian kernel. Parameters ---------- X : iterable For the input to pass the test, we must have: Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that correspond to the given graph format). A valid input also consists of graph type objects. Returns ------- out : list Tuples consisting of the Adjacency matrix, phi, phi_outer dictionary of neihborhood indexes and inverse laplacians up to level self.L and the inverse Laplacian of A. """ if not isinstance(X, collections.Iterable): raise TypeError('input must be an iterable\n') else: ng = 0 out = list() start = time.time() for (idx, x) in enumerate(iter(X)): is_iter = False if isinstance(x, collections.Iterable): is_iter, x = True, list(x) if is_iter and len(x) in [0, 2, 3]: if len(x) == 0: warnings.warn('Ignoring empty element ' + 'on index: '+str(idx)) continue else: x = Graph(x[0], x[1], {}, self._graph_format) elif type(x) is not Graph: x.desired_format(self._graph_format) else: raise TypeError('each element of X must be either a ' + 'graph or an iterable with at least 1 ' + 'and at most 3 elements\n') ng += 1 phi_d = x.get_labels() A = x.get_adjacency_matrix() N = x.produce_neighborhoods(r=self.L, sort_neighbors=False) try: phi = np.array([list(phi_d[i]) for i in range(A.shape[0])]) except TypeError: raise TypeError('Features must be iterable and castable ' + 'in total to a numpy array.') phi_outer = np.dot(phi, phi.T) Lap = laplacian(A).astype(float) _increment_diagonal_(Lap, self.heta) L = inv(Lap) Q = dict() for level in range(1, self.L+1): Q[level] = dict() for (key, item) in iteritems(N[level]): Q[level][key] = dict() Q[level][key]["n"] = np.array(item) if len(item) < A.shape[0]: laplac = laplacian(A[item, :][:, item]).astype(float) _increment_diagonal_(laplac, self.heta) laplac = inv(laplac) else: laplac = L Q[level][key]["l"] = laplac out.append((A, phi, phi_outer, Q, L)) if self.verbose: print("Preprocessing took:", time.time() - start, "s.") if ng == 0: raise ValueError('parsed input is empty') return out
[docs] def pairwise_operation(self, x, y): """ML kernel as proposed in :cite:`kondor2016multiscale`.. Parameters ---------- x, y : tuple Tuple consisting of A, phi, neighborhoods up to self.L and the laplacian of A. Returns ------- kernel : number The kernel value. """ # Extract components Ax, phi_x, a, Qx, Lx = x Ay, phi_y, d, Qy, Ly = y nx, ny = Ax.shape[0], Ay.shape[0] # Create the gram matrix b = np.dot(phi_x, phi_y.T) c = b.T gram_matrix = np.vstack([np.hstack([a, b]), np.hstack([c, d])]) # a lambda that calculates indexes inside the gram matrix # and the corresponindg laplacian given a node and a level for level in range(1, self.L+1): gram_matrix_n = np.empty(shape=gram_matrix.shape) for i in range(nx): qi = Qx[level][i] # xx for j in range(i, nx): qj = Qx[level][j] idx_ij = np.append(qi["n"], qj["n"]) extracted_gm = gram_matrix[idx_ij, :][:, idx_ij] gram_matrix_n[i, j] =\ self._generalized_FLG_core_(qi["l"], qj["l"], extracted_gm) # xy for j in range(i, ny): qj = Qy[level][j] idx_ij = np.append(qi["n"], qj["n"] + nx) extracted_gm = gram_matrix[idx_ij, :][:, idx_ij] gram_matrix_n[i, j + nx] =\ self._generalized_FLG_core_(qi["l"], qj["l"], extracted_gm) for i in range(ny): idx = i + nx qi = Qy[level][i] qi_n = qi["n"] + nx # yx for j in range(i, nx): qj = Qx[level][j] idx_ij = np.append(qi_n, qj["n"]) extracted_gm = gram_matrix[idx_ij, :][:, idx_ij] gram_matrix_n[idx, j] =\ self._generalized_FLG_core_(qi["l"], qj["l"], extracted_gm) # yy for j in range(i, ny): qj = Qy[level][j] idx_ij = np.append(qi_n, qj["n"] + nx) extracted_gm = gram_matrix[idx_ij, :][:, idx_ij] gram_matrix_n[idx, j + nx] =\ self._generalized_FLG_core_(qi["l"], qj["l"], extracted_gm) gram_matrix = np.triu(gram_matrix) + np.triu(gram_matrix, 1).T return self._generalized_FLG_core_(Lx, Ly, gram_matrix)
def _generalized_FLG_core_(self, Lx, Ly, gram_matrix): """FLG core calculation for the multiscale gaussian. Parameters ---------- L_{x,y} (np.array) Inverse laplacians of graph {x,y}. Gram_matrix : np.array The corresponding gram matrix for the two graphs. Returns ------- kernel : number The FLG core kernel value. """ nx = Lx.shape[0] # w the eigen vectors, v the eigenvalues v, w = eig(gram_matrix) v, w = np.real(v), np.real(w.T) # keep only the positive vpos = np.where(v > positive_eigenvalue_limit)[0] k = .0 if vpos.shape[0] > 0: # calculate the Q matrix Q = np.square(v[vpos]) * w[vpos].T Qx, Qy = Q[:nx], Q[nx:] # Calculate the S matrices Sx = multi_dot((Qx.T, Lx, Qx)) Sy = multi_dot((Qy.T, Ly, Qy)) _increment_diagonal_(Sx, self.gamma) _increment_diagonal_(Sy, self.gamma) def sle(mat): return np.sum(np.log(np.real(eigvals(mat)))) # Caclulate the kernel nominator log_detS = -sle(inv(Sx) + inv(Sy)) logr = (log_detS - 0.5*(sle(Sx) + sle(Sy)))/2.0 if logr >= -30: k = exp(logr) return k
def _increment_diagonal_(A, value): """Increment the diagonal of an array by a value. Parameters ---------- A : np.array The array whose diagonal will be extracted. value : number The value that will be incremented on the diagonal. Returns ------- None. """ d = A.diagonal() d.setflags(write=True) d += value