"""RW-kernel. as in :cite:`kashima2003marginalized`, :cite:`gartner2003graph`."""
# Author: Ioannis Siglidis <y.siglidis@gmail.com>
# License: BSD 3 clause
import collections
import warnings
import numpy as np
from itertools import product
from numpy import ComplexWarning
from numpy.linalg import inv
from numpy.linalg import eig
from numpy.linalg import multi_dot
from scipy.linalg import expm
from scipy.sparse.linalg import cg
from scipy.sparse.linalg import LinearOperator
from grakel.kernels import Kernel
from grakel.graph import Graph
# Python 2/3 cross-compatibility import
from builtins import range
[docs]class RandomWalk(Kernel):
"""The random walk kernel class.
See :cite:`kashima2003marginalized`, :cite:`gartner2003graph`
and :cite:`vishwanathan2006fast`.
Parameters
----------
lambda : float
A lambda factor concerning summation.
method_type : str, valid_values={"baseline", "fast"}
The method to use for calculating random walk kernel:
+ "baseline" *Complexity*: :math:`O(|V|^6)`
(see :cite:`kashima2003marginalized`, :cite:`gartner2003graph`)
+ "fast" *Complexity*: :math:`O((|E|+|V|)|V||M|)`
(see :cite:`vishwanathan2006fast`)
kernel_type : str, valid_values={"geometric", "exponential"}
Defines how inner summation will be applied.
p : int or None
If initialised defines the number of steps.
Attributes
----------
mu_ : list
List of coefficients concerning a finite sum, in case p is not None.
"""
_graph_format = "adjacency"
[docs] def __init__(self, n_jobs=None,
normalize=False, verbose=False,
lamda=0.1, method_type="fast",
kernel_type="geometric", p=None):
"""Initialise a random_walk kernel."""
# setup valid parameters and initialise from parent
super(RandomWalk, self).__init__(
n_jobs=n_jobs, normalize=normalize, verbose=verbose)
# Ignores ComplexWarning as it does not signify anything problematic
warnings.filterwarnings('ignore', category=ComplexWarning)
# Setup method type and define operation.
self.method_type = method_type
self.kernel_type = kernel_type
self.p = p
self.lamda = lamda
self._initialized.update({"method_type": False, "kernel_type": False,
"p": False, "lamda": False})
def initialize(self):
"""Initialize all transformer arguments, needing initialization."""
super(RandomWalk, self).initialize()
if not self._initialized["method_type"]:
# Setup method type and define operation.
if (self.method_type == "baseline" or
(self.method_type == "fast"
and self.p is None
and self.kernel_type == "geometric")):
self.add_input_ = idem
elif self.method_type == "fast":
# Spectral Decomposition if adjacency matrix is symmetric
self.add_input_ = sd
else:
raise ValueError('unsupported method_type')
self._initialized["method_type"] = True
if not self._initialized["kernel_type"]:
if self.kernel_type not in ["geometric", "exponential"]:
raise ValueError('unsupported kernel type: either "geometric" '
'or "exponential"')
if not self._initialized["p"]:
if self.p is not None:
if type(self.p) is int and self.p > 0:
if self.kernel_type == "geometric":
self.mu_ = [1]
fact = 1
power = 1
for k in range(1, self.p + 1):
fact *= k
power *= self.lamda
self.mu_.append(fact/power)
else:
self.mu_ = [1]
power = 1
for k in range(1, self.p + 1):
power *= self.lamda
self.mu_.append(power)
else:
raise TypeError('p must be a positive integer bigger than '
'zero or nonetype')
self._initialized["kernel_type"] = True
if not self._initialized["lamda"]:
if self.lamda <= 0:
raise TypeError('lambda must be positive bigger than equal')
elif self.lamda > 0.5 and self.p is None:
warnings.warn('random-walk series may fail to converge')
self._initialized["lamda"] = True
def parse_input(self, X):
"""Parse and create features for random_walk kernel.
Parameters
----------
X : iterable
For the input to pass the test, we must have:
Each element must be an iterable with at most three features and at
least one. The first that is obligatory is a valid graph structure
(adjacency matrix or edge_dictionary) while the second is
node_labels and the third edge_labels (that correspond to the given
graph format). A valid input also consists of graph type objects.
Returns
-------
out : list
The extracted adjacency matrices for any given input.
"""
if not isinstance(X, collections.Iterable):
raise TypeError('input must be an iterable\n')
else:
i = 0
out = list()
for (idx, x) in enumerate(iter(X)):
is_iter = isinstance(x, collections.Iterable)
if is_iter:
x = list(x)
if is_iter and len(x) in [0, 1, 2, 3]:
if len(x) == 0:
warnings.warn('Ignoring empty element' +
' on index: '+str(idx))
continue
else:
A = Graph(x[0], {}, {},
self._graph_format).get_adjacency_matrix()
elif type(x) is Graph:
A = x.get_adjacency_matrix()
else:
raise TypeError('each element of X must be either a ' +
'graph or an iterable with at least 1 ' +
'and at most 3 elements\n')
i += 1
out.append(self.add_input_(A))
if i == 0:
raise ValueError('parsed input is empty')
return out
def pairwise_operation(self, X, Y):
"""Calculate the random walk kernel.
Fast:
Spectral demoposition algorithm as presented in
:cite:`vishwanathan2006fast` p.13, s.4.4, with
complexity of :math:`O((|E|+|V|)|E||V|^2)` for graphs witout labels.
Baseline:
Algorithm presented in :cite:`kashima2003marginalized`,
:cite:`gartner2003graph` with complexity of :math:`O(|V|^6)`
Parameters
----------
X, Y : Objects
Objects as produced from parse_input.
Returns
-------
kernel : number
The kernel value.
"""
if self.method_type == "baseline":
# calculate the product graph
XY = np.kron(X, Y)
# algorithm presented in
# [Kashima et al., 2003; Gartner et al., 2003]
# complexity of O(|V|^6)
# XY is a square matrix
s = XY.shape[0]
if self.p is not None:
P = np.eye(XY.shape[0])
S = self.mu_[0] * P
for k in self.mu_[1:]:
P = np.matmul(P, XY)
S += k*P
else:
if self.kernel_type == "geometric":
S = inv(np.identity(s) - self.lamda*XY).T
elif self.kernel_type == "exponential":
S = expm(self.lamda*XY).T
return np.sum(S)
elif self.method_type == "fast" and (self.p is not None or self.kernel_type == "exponential"):
# Spectral demoposition algorithm as presented in
# [Vishwanathan et al., 2006] p.13, s.4.4, with
# complexity of O((|E|+|V|)|E||V|^2) for graphs
# witout labels
# calculate kernel
qi_Pi, wi = X
qj_Pj, wj = Y
# calculate flanking factor
ff = np.expand_dims(np.kron(qi_Pi, qj_Pj), axis=0)
# calculate D based on the method
Dij = np.kron(wi, wj)
if self.p is not None:
D = np.ones(shape=(Dij.shape[0],))
S = self.mu_[0] * D
for k in self.mu_[1:]:
D *= Dij
S += k*D
S = np.diagflat(S)
else:
# Exponential
S = np.diagflat(np.exp(self.lamda*Dij))
return ff.dot(S).dot(ff.T)
else:
# Random Walk
# Conjugate Gradient Method as presented in
# [Vishwanathan et al., 2006] p.12, s.4.2
Ax, Ay = X, Y
xs, ys = Ax.shape[0], Ay.shape[0]
mn = xs*ys
def lsf(x, lamda):
xm = x.reshape((xs, ys), order='F')
y = np.reshape(multi_dot((Ax, xm, Ay)), (mn,), order='F')
return x - self.lamda * y
# A*x=b
A = LinearOperator((mn, mn), matvec=lambda x: lsf(x, self.lamda))
b = np.ones(mn)
x_sol, _ = cg(A, b, tol=1.0e-6, maxiter=20, atol='legacy')
return np.sum(x_sol)
[docs]class RandomWalkLabeled(RandomWalk):
"""The labeled random walk kernel class.
See :cite:`kashima2003marginalized`, :cite:`gartner2003graph`
and :cite:`vishwanathan2006fast`.
Parameters
----------
lambda : float
A lambda factor concerning summation.
method_type : str, valid_values={"baseline", "fast"}
The method to use for calculating random walk kernel [geometric]:
+ "baseline" *Complexity*: :math:`O(|V|^6)`
(see :cite:`kashima2003marginalized`,
:cite:`gartner2003graph`)
+ "fast" *Complexity*: :math:`O(|E|^{2}rd|V|^{3})`
(see :cite:`vishwanathan2006fast`)
kernel_type : str, valid_values={"geometric", "exponential"}
Defines how inner summation will be applied.
p : int, optional
If initialised defines the number of steps.
Attributes
----------
_lamda : float, default=0.1
A lambda factor concerning summation.
_kernel_type : str, valid_values={"geometric", "exponential"},
default="geometric"
Defines how inner summation will be applied.
_method_type : str valid_values={"baseline", "fast"},
default="fast"
The method to use for calculating random walk kernel:
+ "baseline" *Complexity*: :math:`O(|V|^6)`
(see :cite:`kashima2003marginalized`,
:cite:`gartner2003graph`)
+ "fast" *Complexity*: :math:`O((|E|+|V|)|V||M|)`
(see :cite:`vishwanathan2006fast`)
_p : int, default=1
If not -1, the number of steps of the random walk kernel.
"""
_graph_format = "adjacency"
[docs] def __init__(self, n_jobs=None,
normalize=False, verbose=False,
lamda=0.1, method_type="fast",
kernel_type="geometric", p=None):
"""Initialise a labeled random_walk kernel."""
# Initialise from parent
super(RandomWalkLabeled, self).__init__(
n_jobs=n_jobs, normalize=normalize, verbose=verbose,
lamda=lamda, method_type=method_type, kernel_type=kernel_type,
p=p)
def parse_input(self, X):
"""Parse and create features for graphlet_sampling kernel.
Parameters
----------
X : iterable
For the input to pass the test, we must have:
Each element must be an iterable with at most three features and at
least one. The first that is obligatory is a valid graph structure
(adjacency matrix or edge_dictionary) while the second is
node_labels and the third edge_labels (that correspond to the given
graph format). A valid input also consists of graph type objects.
Returns
-------
out : list
The extracted adjacency matrices for any given input.
"""
if not isinstance(X, collections.Iterable):
raise TypeError('input must be an iterable\n')
else:
i = 0
proc = list()
for (idx, x) in enumerate(iter(X)):
is_iter = isinstance(x, collections.Iterable)
if is_iter:
x = list(x)
if is_iter and len(x) in [1, 2, 3]:
if len(x) == 0:
warnings.warn('Ignoring empty element' +
' on index: '+str(idx))
continue
else:
x = Graph(x[0], x[1], {}, self._graph_format)
elif type(x) is not Graph:
raise TypeError('each element of X must be either a ' +
'graph or an iterable with at least 2 ' +
'and at most 3 elements\n')
i += 1
x.desired_format("adjacency")
Ax = x.get_adjacency_matrix()
Lx = x.get_labels(purpose="adjacency")
Lx = [Lx[idx] for idx in range(Ax.shape[0])]
proc.append((Ax, Lx, Ax.shape[0]))
out = list()
for Ax, Lx, s in proc:
amss = dict()
labels = set(Lx)
Lx = np.array(Lx)
for t in product(labels, labels):
selector = np.matmul(np.expand_dims(Lx == t[0], axis=1),
np.expand_dims(Lx == t[1], axis=0))
amss[t] = Ax * selector
out.append((amss, s))
if i == 0:
raise ValueError('parsed input is empty')
return out
def pairwise_operation(self, X, Y):
"""Calculate the labeled random walk kernel.
Fast [geometric]:
Conjugate Gradient method as presented in
:cite:`vishwanathan2006fast` p.12, s.4.2, with
complexity of :math:`O(|E|^{2}rd|V|^{3})` for labeled graphs.
Baseline:
Algorithm presented in :cite:`kashima2003marginalized`,
:cite:`gartner2003graph` with complexity of :math:`O(|V|^6)`
Parameters
----------
X, Y : tuples
Tuples of adjacency matrices and labels.
Returns
-------
kernel : number
The kernel value.
"""
X, xs = X
Y, ys = Y
ck = set(X.keys()) & (set(Y.keys()))
mn = xs * ys
if self.kernel_type == "exponential" or self.method_type == "baseline" or self.p is not None:
# Claculate Kronecker product matrix
XY = np.zeros(shape=(mn, mn))
for k in ck:
XY += np.kron(X[k], Y[k])
# XY is a square matrix
s = XY.shape[0]
if self.p is not None:
P = np.eye(XY.shape[0])
S = self.mu_[0] * P
for k in self.mu_[1:]:
P *= XY
S += k*P
elif self.kernel_type == "exponential":
S = expm(self.lamda*XY).T
elif self.kernel_type == "geometric":
# Baseline Algorithm as presented in
# [Vishwanathan et al., 2006]
Id = np.identity(s)
S = inv(Id - self.lamda*XY).T
return np.sum(S)
elif self.method_type == "fast" and self.kernel_type == "geometric":
# Conjugate Gradient Method as presented in
# [Vishwanathan et al., 2006] p.12, s.4.2
AxAy = [(X[k], Y[k]) for k in ck]
if len(ck):
def lsf(x, lamda):
y = 0
xm = x.reshape((xs, ys), order='F')
for Ax, Ay in AxAy:
y += np.reshape(multi_dot((Ax, xm, Ay)), (mn,), order='F')
return x - self.lamda * y
else:
def lsf(x, lamda):
return x - np.zeros(mn)
# A*x=b
A = LinearOperator((mn, mn), matvec=lambda x: lsf(x, self.lamda))
b = np.ones(mn)
x_sol, _ = cg(A, b, tol=1.0e-6, maxiter=20, atol='legacy')
return np.sum(x_sol)
def idem(x):
return x
def invert(w, v):
return (np.real(np.sum(v, axis=0)), np.real(w))
def sd(x):
return invert(*eig(x))