Source code for grakel.kernels.lovasz_theta

"""The lovasz theta kernel as in :cite:`johansson2014global`."""
# Author: Ioannis Siglidis <y.siglidis@gmail.com>
# License: BSD 3 clause
import collections
import warnings

import numpy as np

from grakel.kernels import Kernel
from grakel.graph import Graph
from grakel.tools import distribute_samples

from sklearn.utils import check_random_state

from math import sqrt
from numpy import pad
from numpy.linalg import LinAlgError
from numpy.linalg import norm
from scipy.linalg import cholesky
from scipy.linalg import eigvalsh
from scipy.linalg import solve

cvxopt_installed = True
try:
    from cvxopt.base import matrix
    from cvxopt.base import spmatrix
    from cvxopt.solvers import sdp
    from cvxopt.solvers import options
    options['show_progress'] = False
except ImportError:
    cvxopt_installed = False

min_weight = float("1e-10")
angle_precision = float("1e-6")
tolerance = float("1e-1")


[docs]class LovaszTheta(Kernel): """Lovasz theta kernel as proposed in :cite:`johansson2014global`. Parameters ---------- X,Y : *valid-graph-format* The pair of graphs on which the kernel is applied. n_samples : int, default=50 The number of samples. subsets_size_range : tuple, len=2, default=(2,8) (min, max) size of the vertex set of sampled subgraphs. random_state : RandomState or int, default=None A random number generator instance or an int to initialize a RandomState as a seed. base_kernel : function (np.1darray, np.1darray -> number), default=None The applied metric between the lovasz_theta numbers of subgraphs. If None :math:`f(x,y) = x*y` max_dim : int, default=None The maximum graph size that can appear both in fit or transform. When None, max_dim is calculated based on the size of the biggest graph on fit. This can lead to a crash in case a graph appears in transform with size bigger than in fit. Attributes ---------- d_ : int, The maximum matrix dimension of fit plus 1. Signifies the number of features assigned for lovasz labelling. random_state_ : RandomState A RandomState object handling all randomness of the class. """ _graph_format = "adjacency"
[docs] def __init__(self, n_jobs=None, normalize=False, verbose=False, random_state=None, n_samples=50, subsets_size_range=(2, 8), max_dim=None, base_kernel=None): """Initialise a lovasz_theta kernel.""" # setup valid parameters and initialise from parent if not cvxopt_installed: raise ImportError('cvxopt should be installed for the ' 'computation of the Lovasz-Theta Kernel.') super(LovaszTheta, self).__init__(n_jobs=n_jobs, normalize=normalize, verbose=verbose) self.n_samples = n_samples self.subsets_size_range = subsets_size_range self.base_kernel = base_kernel self.random_state = random_state self.max_dim = max_dim self._initialized.update({"n_samples": False, "subsets_size_range": False, "base_kernel": False, "random_state": False, "max_dim": False})
def initialize(self): """Initialize all transformer arguments, needing initialization.""" super(LovaszTheta, self).initialize() if not self._initialized["n_samples"]: if self.n_samples <= 0 or type(self.n_samples) is not int: raise TypeError('n_samples must an integer be bigger than ' 'zero') self._initialized["n_samples"] = True if not self._initialized["subsets_size_range"]: if (type(self.subsets_size_range) is not tuple or len(self.subsets_size_range) != 2 or any(type(i) is not int for i in self.subsets_size_range) or self.subsets_size_range[0] > self.subsets_size_range[1] or self.subsets_size_range[0] <= 0): raise TypeError('subsets_size_range subset size range' 'must be a tuple of two integers in ' 'increasing order, bigger than 1') self._initialized["subsets_size_range"] = True if not self._initialized["base_kernel"]: if not callable(self.base_kernel) and self.base_kernel is not None: raise TypeError('base_kernel between arguments ' + 'must be a function') self._initialized["base_kernel"] = True self.base_kernel_ = inner_product if not self._initialized["random_state"]: self.random_state_ = check_random_state(self.random_state) self._initialized["random_state"] = True if not self._initialized["max_dim"]: if self.max_dim is not None and (type(self.max_dim) is not int or self.max_dim < 1): raise ValueError('max_dim if not None, should be an integer bigger than 1') if self.max_dim is None: self.d_ = None else: self.d_ = self.max_dim + 1 self._initialized["max_dim"] = True def parse_input(self, X): """Parse and create features for lovasz_theta kernel. Parameters ---------- X : iterable For the input to pass the test, we must have: Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that correspond to the given graph format). A valid input also consists of graph type objects. Returns ------- out : list The lovasz metrics for the given input. """ if not isinstance(X, collections.Iterable): raise TypeError('input must be an iterable\n') else: i = 0 adjm = list() max_dim = 0 for (idx, x) in enumerate(iter(X)): is_iter = False if isinstance(x, collections.Iterable): x, is_iter = list(x), True if is_iter and len(x) in [0, 1, 2, 3]: if len(x) == 0: warnings.warn('Ignoring empty element ' + 'on index: '+str(idx)) continue else: x = Graph(x[0], {}, {}, self._graph_format) elif type(x) is not Graph: raise TypeError('each element of X must be either a ' + 'graph or an iterable with at least 1 ' + 'and at most 3 elements\n') i += 1 A = x.get_adjacency_matrix() adjm.append(A) max_dim = max(max_dim, A.shape[0]) if self._method_calling == 1: if self.d_ is None: self.d_ = max_dim + 1 if self.d_ < max_dim + 1: if self.max_dim is None and self._method_calling == 3: raise ValueError('Maximum dimension of a graph in transform is bigger ' 'than the one found in fit. To avoid that use max_dim parameter.') else: raise ValueError('max_dim should correspond to the ' 'biggest graph inside the dataset') out = list() for A in adjm: X, t = _calculate_lovasz_embeddings_(A) U = _calculate_lovasz_labelling_(X, t, self.d_) out.append(self._calculate_MEC_(U)) if i == 0: raise ValueError('parsed input is empty') return out def pairwise_operation(self, x, y): """Lovasz theta kernel as proposed in :cite:`johansson2014global`. Parameters ---------- x, y : dict Subgraph samples metric dictionaries for all levels. Returns ------- kernel : number The kernel value. """ return self.base_kernel_(x, y) def _calculate_MEC_(self, U): """Calculate the minimum enclosing cone for given U. Calculates the minimum a graph metric as the lovasz theta kernel or the svm-theta kernel, on a set of randomly sampled subgraphs, producing a dictionary of subgraph levels and sets. Parameters ---------- U : np.array, ndim=2 The adjacency matrix. Returns ------- level_values : np.array, shape=(num_of_levels, 1) Returns for all levels the mean of lovasz_numbers for sampled subsets. """ # Calculate subsets n = U.shape[1] samples_on_subsets = distribute_samples(n, self.subsets_size_range, self.n_samples) # Calculate level dictionary with lovasz values phi = np.zeros(shape=(self.subsets_size_range[1] - self.subsets_size_range[0]+1, 1)) for (i, level) in enumerate(range(self.subsets_size_range[0], self.subsets_size_range[1] + 1)): v = samples_on_subsets.get(level, None) if v is not None: level_values = list() for k in range(v): if level <= n: indexes = self.random_state_.choice(n, level, replace=False) else: indexes = range(n) # calculate the metrix value for that level level_values.append(_minimum_cone_(U[:, indexes], self.random_state_)) phi[i] = np.mean(level_values) return phi
def _calculate_lovasz_embeddings_(A): """Calculate the lovasz embeddings for the given graph. Parameters ---------- A : np.array, ndim=2 The adjacency matrix. Returns ------- lovasz_theta : float Returns the lovasz theta number. optimization_solution : np.array Returns the slack variable solution of the primal optimization program. """ if A.shape[0] == 1: return 1.0 else: tf_adjm = (np.abs(A) <= min_weight) np.fill_diagonal(tf_adjm, False) i_list, j_list = np.nonzero(np.triu(tf_adjm.astype(int), k=1)) nv = A.shape[0] ne = len(i_list) x_list = list() e_list = list() for (e, (i, j)) in enumerate(zip(i_list, j_list)): e_list.append(int(e)), x_list.append(int(i*nv+j)) if tf_adjm[i, j]: e_list.append(int(e)), x_list.append(int(i*nv+j)) # Add on the last row, diagonal elements e_list = e_list+(nv*[ne]) x_list = x_list+[int(i*nv + i) for i in range(nv)] # initialise g sparse (to values -1, based on two list that # define index and one that defines shape g_sparse = spmatrix(-1, x_list, e_list, (nv*nv, ne+1)) # Initialise optimization parameters h = matrix(-1.0, (nv, nv)) c = matrix([0.0]*ne + [1.0]) # Solve the convex optimization problem sol = sdp(c, Gs=[g_sparse], hs=[h]) return np.array(sol['ss'][0]), sol['x'][ne] def _calculate_lovasz_labelling_(X, t, d): """Calculate the lovasz labelling for the extracted embeddings. Parameters ---------- X : np.array, ndim=2 The optimization solution of the sdp program. t : float Returns the lovasz theta number. d : int The number of features size. Returns ------- lovasz_theta : float Returns the lovasz theta number. optimization_solution : np.array Returns the slack variable solution of the primal optimization program. """ n = X.shape[0] try: V = cholesky(X) except LinAlgError: x = X.diagonal() x.setflags(write=True) x += 2*abs(eigvalsh(X, lower=False, eigvals=(0, 0))[0]) V = cholesky(X) V = pad(V, [(0, d-n), (0, 0)], mode='constant', constant_values=0) c = np.zeros(shape=(d,)) c[-1] = 1 C = np.outer(c, np.ones(shape=(n,))) U = 1/sqrt(t)*(C+V) return U def _minimum_cone_(U, rs): """Calculate the minimum cone. Computes the angle and center of the minimum cone, with its point in the origin, enclosing the vectors in A. Parameters ---------- U : np.array, ndim=2 The vectors that will be enclosed. rs : RandomState A RandomState object handling all randomness of the class. Returns ------- angle_cosine : float Returns the cosine of the minimum angle. """ n = U.shape[1] P = rs.permutation(n) - 1 R = np.array([], dtype=int) c, _ = _b_minidisk_(U, P, R, rs) with np.errstate(divide='ignore'): c /= norm(c, 2) t = min(np.dot(U.T, c)) if t > 1. and t < angle_precision: t = 1. elif t < -1. and t > -angle_precision: t = -1. return t def _b_minidisk_(A, P, R, rs): """Calculate the minidisk. Implements Welzl's algorithm (*move-to-front*) Parameters ---------- A : np.array, ndim=2 The vectors that will be enclosed. P : np.array, ndim=1 A random permutation of indeces. R : np.array, ndim=1 A subset of vectors that are enclosed. rs : RandomState A RandomState object handling all randomness of the class. Returns ------- c : np.array, ndim=1 The center vector C of the cone as it is being minimized. r : int The cone radius as it is beeing minimized. """ d, nP, nR = A.shape[0], P.shape[0], R.shape[0] # original algorithm if nP == 0 or nR == d+1: if nR == 0: c, r = np.zeros(shape=(d,)), 0 else: c, r = _fitball_(A[:, R]) else: p = P[rs.randint(0, nP)] P_prime = np.delete(P, np.where(P == p)) c, r = _b_minidisk_(A, P_prime, R, rs) if norm(A[:, p] - c, 2) - r > tolerance: # if not inside ball if p not in R: R_prime = pad(R, [(0, 1)], mode='constant', constant_values=p) c, r = _b_minidisk_(A, P_prime, R_prime, rs) return c, r def _fitball_(A): """Fit the minimum ball. Parameters ---------- A : np.array, ndim=2 The vectors that will be enclosed inside the ball. Returns ------- c : np.array, ndim=1 The center vector C of the ball. r : int The ball radius. """ d = A.shape[0] n = A.shape[1] if n == 1: c, r = A[:, 0], 0 else: Q = A-np.outer(A[:, 0], np.ones(shape=(n, 1))) B = 2*np.dot(Q.T, Q) b = B.diagonal()/2 L = solve(B[1:, :][:, 1:], b[1:]) L = pad(L, [(1, 0)], mode='constant', constant_values=0) C = np.zeros(shape=(d,)) for i in range(1, n): C = C + L[i]*Q[:, i] r = np.sqrt(np.dot(C, C)) c = C + A[:, 1] return c, r def inner_product(x, y): return x.T.dot(y)