Source code for grakel.kernels.multiscale_laplacian

"""Multiscale Laplacian Graph Kernel as defined in :cite:`kondor2016multiscale`."""
# Author: Ioannis Siglidis <y.siglidis@gmail.com>
# License: BSD 3 clause
# Python 2/3 cross-compatibility import
from __future__ import print_function

import collections
import warnings
import numpy as np

from numbers import Real
from math import exp

from sklearn.utils import check_random_state

from numpy.linalg import eig
from numpy.linalg import inv
from numpy.linalg import multi_dot
from numpy.linalg import eigvals

from grakel.graph import Graph
from scipy.sparse.csgraph import laplacian

from grakel.kernels import Kernel

positive_eigenvalue_limit = float("+1e-6")


[docs]class MultiscaleLaplacian(Kernel): """Laplacian Graph Kernel as proposed in :cite:`kondor2016multiscale`. Parameters ---------- random_state : RandomState or int, default=None A random number generator instance or an int to initialize a RandomState as a seed. L : int, default=3 The number of neighborhoods. gamma : Real, default=0.01 A smoothing parameter of float value. heta : float, default=0.01 A smoothing parameter of float value. P : int, default=10 Restrict the maximum number of eigenvalues, taken on eigenvalue decomposition. n_samples : int, default=50 The number of vertex samples. Attributes ---------- random_state_ : RandomState A RandomState object handling all randomness of the class. _data_level : dict A dictionary containing the feature basis information needed for each level calculation on transform. """ _graph_format = "adjacency"
[docs] def __init__(self, n_jobs=None, normalize=False, verbose=False, random_state=None, L=3, P=10, gamma=0.01, heta=0.01, n_samples=50): """Initialise a `multiscale_laplacian` kernel.""" super(MultiscaleLaplacian, self).__init__( n_jobs=n_jobs, normalize=normalize, verbose=verbose) self.random_state = random_state self.gamma = gamma self.heta = heta self.L = L self.P = P self.n_samples = n_samples self._initialized.update({"random_state": False, "gamma": False, "heta": False, "L": False, "n_samples": False, "P": False})
def initialize(self): """Initialize all transformer arguments, needing initialization.""" super(MultiscaleLaplacian, self).initialize() if not self._initialized["random_state"]: self.random_state_ = check_random_state(self.random_state) self._initialized["random_state"] = True if not self._initialized["gamma"]: if not isinstance(self.gamma, Real): raise TypeError('gamma must be a real number') elif self.gamma == .0: warnings.warn('with zero gamma the calculation may crash') elif self.gamma < 0: raise TypeError('gamma must be positive') self._initialized["gamma"] = True if not self._initialized["heta"]: if not isinstance(self.heta, Real): raise TypeError('heta must be a real number') elif self.heta == .0: warnings.warn('with zero heta the calculation may crash') elif self.heta < 0: raise TypeError('heta must be positive') self._initialized["heta"] = True if not self._initialized["L"]: if type(self.L) is not int: raise TypeError('L must be an integer') elif self.L < 0: raise TypeError('L must be positive') self._initialized["L"] = True if not self._initialized["n_samples"]: if type(self.n_samples) is not int or self.n_samples <= 0: raise TypeError('n_samples must be a positive integer') self._initialized["n_samples"] = True if not self._initialized["P"]: if type(self.P) is not int or self.P <= 0: raise TypeError('P must be a positive integer') self._initialized["P"] = True def parse_input(self, X): """Fast ML Graph Kernel. See supplementary material :cite:`kondor2016multiscale`, algorithm 1. Parameters ---------- X : iterable For the input to pass the test, we must have: Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that correspond to the given graph format). A valid input also consists of graph type objects. Returns ------- out : list A list of tuples with S matrices inverses and their 4th-root determinants. """ if not isinstance(X, collections.Iterable): raise TypeError('input must be an iterable\n') else: ng = 0 out = list() data = dict() neighborhoods = dict() for (idx, x) in enumerate(iter(X)): is_iter = False if isinstance(x, collections.Iterable): is_iter, x = True, list(x) if is_iter and len(x) in [0, 2, 3]: if len(x) == 0: warnings.warn('Ignoring empty element ' + 'on index: '+str(idx)) continue else: x = Graph(x[0], x[1], {}, self._graph_format) elif type(x) is Graph: x.desired_format(self._graph_format) else: raise TypeError('each element of X must be either a ' 'graph or an iterable with at least 1 ' 'and at most 3 elements\n') phi_d = x.get_labels() A = x.get_adjacency_matrix() try: phi = np.array([list(phi_d[i]) for i in range(A.shape[0])]) except TypeError: raise TypeError('Features must be iterable and castable ' 'in total to a numpy array.') Lap = laplacian(A).astype(float) _increment_diagonal_(Lap, self.heta) data[ng] = {0: A, 1: phi, 2: inv(Lap)} neighborhoods[ng] = x ng += 1 if ng == 0: raise ValueError('parsed input is empty') # Define a function for calculating the S's of subgraphs of each iteration def calculate_C(k, j, l): if type(neighborhoods[k]) is Graph: neighborhoods[k] = neighborhoods[k].produce_neighborhoods( r=self.L, sort_neighbors=False) indexes = neighborhoods[k][l][j] L = laplacian(data[k][0][indexes, :][:, indexes]).astype(float) _increment_diagonal_(L, self.heta) U = data[k][1][indexes, :] S = multi_dot((U.T, inv(L), U)) _increment_diagonal_(S, self.gamma) return (inv(S), np.sum(np.log(np.real(eigvals(S))))) if self._method_calling == 1: V = [(k, j) for k in range(ng) for j in range(data[k][0].shape[0])] ns = min(len(V), self.n_samples) self.random_state_.shuffle(V) vs = V[:ns] phi_k = np.array([data[k][1][j, :] for (k, j) in vs]) # w the eigen vectors, v the eigenvalues K = phi_k.dot(phi_k.T) # Calculate eigenvalues v, w = eig(K) v, w = np.real(v), np.real(w.T) # keep only the positive vpos = np.argpartition(v, -self.P)[-self.P:] vpos = vpos[np.where(v[vpos] > positive_eigenvalue_limit)] # ksi.shape = (k, Ns) * (Ns, P) ksi = w[vpos].dot(phi_k).T / np.sqrt(v[vpos]) for j in range(ng): # (n_samples, k) * (k, P) data[j][1] = data[j][1].dot(ksi) self._data_level = {0: ksi} for l in range(1, self.L+1): # Take random samples from all the vertices of all graphs self.random_state_.shuffle(V) vs = V[:ns] # Compute the reference subsampled Gram matrix K_proj = {k: np.zeros(shape=(data[k][0].shape[0], ns)) for k in range(ng)} K, C = np.zeros(shape=(len(vs), len(vs))), dict() for (m, (k, j)) in enumerate(vs): C[m] = calculate_C(k, j, l) K_proj[k][j, m] = K[m, m] = self.pairwise_operation(C[m], C[m]) for (s, (k2, j2)) in enumerate(vs): if s < m: K[s, m] = K[m, s] \ = K_proj[k2][j2, m] \ = K_proj[k][j, s] \ = self.pairwise_operation(C[s], C[m]) else: break # Compute the kernels of the relations of the reference to everything else for (k, j) in V[ns:]: for (m, _) in enumerate(vs): K_proj[k][j, m] = self.pairwise_operation(C[m], calculate_C(k, j, l)) # w the eigen vectors, v the eigenvalues v, w = eig(K) v, w = np.real(v), np.real(w.T) # keep only the positive vpos = np.argpartition(v, -self.P)[-self.P:] vpos = vpos[np.where(v[vpos] > positive_eigenvalue_limit)] # Q shape=(k, P) Q = w[vpos].T / np.sqrt(v[vpos]) for j in range(ng): # (n, ns) * (ns, P) data[j][1] = K_proj[j].dot(Q) self._data_level[l] = (C, Q) elif self._method_calling == 3: ksi = self._data_level[0] for j in range(ng): # (n, k) * (k, P) data[j][1] = data[j][1].dot(ksi) for l in range(1, self.L+1): C, Q = self._data_level[l] for j in range(ng): K_proj = np.zeros(shape=(data[j][0].shape[0], len(C))) for n in range(data[j][0].shape[0]): for m in range(len(C)): K_proj[n, m] = self.pairwise_operation(C[m], calculate_C(j, n, l)) data[j][1] = K_proj.dot(Q) # Apply the final calculation of S. for k in range(ng): S = multi_dot((data[k][1].T, data[k][2], data[k][1])) _increment_diagonal_(S, self.gamma) out.append((inv(S), np.sum(np.log(np.real(eigvals(S)))))) return out def pairwise_operation(self, x, y): """FLG calculation for the fast multiscale laplacian. Parameters ---------- x, y : tuple An np.array of inverse and the log determinant of S (for the calculation of S matrices see the algorithm 1 of the supplement material in cite:`kondor2016multiscale`). Returns ------- kernel : number The FLG core kernel value. """ S_inv_x, log_det_x = x S_inv_y, log_det_y = y # Calculate the result in term of logs log_detS = -np.sum(np.log(np.real(eigvals(S_inv_x + S_inv_y)))) logr = (log_detS - 0.5*(log_det_x + log_det_y))/2.0 if logr < -30: return .0 else: return exp(logr)
def _increment_diagonal_(A, value): """Increment the diagonal of an array by a value. Parameters ---------- A : np.array The array whose diagonal will be extracted. value : number The value that will be incremented on the diagonal. Returns ------- None. """ d = A.diagonal() d.setflags(write=True) d += value