Source code for grakel.kernels.graphlet_sampling

"""The graphlet sampling kernel :cite:`shervashidze2009efficient`."""
# Author: Ioannis Siglidis <y.siglidis@gmail.com>
# License: BSD 3 clause
import collections
import math
import warnings

import numpy as np

from scipy.interpolate import interp1d
from sklearn.exceptions import NotFittedError
from sklearn.utils import check_random_state
from sklearn.utils.validation import check_is_fitted

from grakel.graph import Graph
from grakel.kernels import Kernel
from grakel.kernels._c_functions import ConSubg
from grakel.kernels._isomorphism import Graph as bGraph

# Python 2/3 cross-compatibility import
from six import iteritems
from six import itervalues
from builtins import range


[docs]class GraphletSampling(Kernel): r"""The graphlet sampling kernel. See :cite:`shervashidze2009efficient`. If either "delta", "epsilon", "a" or "n_samples" is given calculates the kernel value for the given (or derived) random picked n_samples, by randomly sampling from k from 3 to 5. Otherwise calculates the kernel value drawing all possible connected samples of size k. Parameters ---------- random_state : RandomState or int, default=None A random number generator instance or an int to initialize a RandomState as a seed. k : int, default=5 The dimension of the given graphlets. sampling : None or dict Defines if random sampling of graphlets will be utilised. If not None the dictionary can either contain: - n_samples : int Sets the value of randomly drawn random samples, from sizes between 3..k. Overides the parameters a, epsilon, delta. or - delta : float, default=0.05 Confidence level (typically 0.05 or 0.1). For calculation of the number of samples achieving the certain bound. n_samples argument must not be provided and for initialising the default value either "epsilon" or "a" must be set. - epsilon : float, default=0.05 Precision level (typically 0.05 or 0.1). For calculation of the number of samples achieving the certain bound. n_samples argument must not be provided and for initialising the default value either "delta" or "a" must be set. - a : int Number of isomorphism classes of graphlets. If -1 the number is the maximum possible, from a database 1 until 9 or else predicted through interpolation. For calculation of the number of samples achieving the certain bound. n_samples argument must not be provided and for initializing the default value either "delta" or "epsilon" must be set. Attributes ---------- X : dict A dictionary of pairs between each input graph and a bins where the sampled graphlets have fallen. sample_graphlets_ : function A function taking as input a binary adjacency matrix, parametrised to work for the certain samples, k and deterministic/propabilistic mode. random_state_ : RandomState A RandomState object handling all randomness of the class. _graph_bins : dict A dictionary of graph bins holding pynauty objects _nx : int Holds the number of sampled X graphs. _ny : int Holds the number of sampled Y graphs. _X_diag : np.array, shape=(_nx, 1) Holds the diagonal of X kernel matrix in a numpy array, if calculated (`fit_transform`). _phi_X : np.array, shape=(_nx, len(_graph_bins)) Holds the features of X in a numpy array, if calculated. (`fit_transform`). """ _graph_format = "adjacency"
[docs] def __init__(self, n_jobs=None, normalize=False, verbose=False, random_state=None, k=5, sampling=None): """Initialise a subtree_wl kernel.""" super(GraphletSampling, self).__init__(n_jobs=n_jobs, normalize=normalize, verbose=verbose) self.random_state = random_state self.k = k self.sampling = sampling self._initialized.update({"random_state": False, "k": False, "sampling": False})
def initialize(self): """Initialize all transformer arguments, needing initialization.""" self._graph_bins = dict() if not self._initialized["n_jobs"]: if self.n_jobs is not None: warnings.warn('no implemented parallelization for GraphletSampling') self._initialized["n_jobs"] = True if not self._initialized["random_state"]: self.random_state_ = check_random_state(self.random_state) self._initialized["random_state"] = True if not self._initialized["k"]: if type(self.k) is not int: raise TypeError('k must be an int') if self.k > 10: warnings.warn('graphlets are too big - ' 'computation may be slow') elif self.k < 3: raise TypeError('k must be bigger than 3') self._initialized["k"] = True if not self._initialized["sampling"]: sampling = self.sampling k = self.k if sampling is None: n_samples = None def sample_graphlets(A, k, *args): return sample_graphlets_all_connected(A, k) elif type(sampling) is dict: if "n_samples" in sampling: # Get the number of samples n_samples = sampling["n_samples"] # Display a warning if arguments ignored args = [arg for arg in ["delta", "epsilon", "a"] if arg in sampling] if len(args): warnings.warn('Number of samples defined as input, ' + 'ignoring arguments:', ', '.join(args)) # Initialise the sample graphlets function sample_graphlets = sample_graphlets_probabilistic elif ("delta" in sampling or "epsilon" in sampling or "a" in sampling): # Otherwise if delta exists delta = sampling.get("delta", 0.05) # or epsilon epsilon = sampling.get("epsilon", 0.05) # or a a = sampling.get("a", -1) # check the fit constraints if delta > 1 or delta < 0: raise TypeError('delta must be in the range (0,1)') if epsilon > 1 or epsilon < 0: raise TypeError('epsilon must be in the range (0,1)') if type(a) is not int: raise TypeError('a must be an integer') elif a == 0: raise TypeError('a cannot be zero') elif a < -1: raise TypeError('negative a smaller than -1 have ' 'no meaning') if(a == -1): fallback_map = {1: 1, 2: 2, 3: 4, 4: 8, 5: 19, 6: 53, 7: 209, 8: 1253, 9: 13599} if(k > 9): warnings.warn( 'warning for such size number of isomorphisms ' 'is not known - interpolation on know values ' 'will be used') # Use interpolations isomorphism_prediction = \ interp1d(list(fallback_map.keys()), list(itervalues(fallback_map)), kind='cubic') a = isomorphism_prediction(k) else: a = fallback_map[k] # and calculate number of samples n_samples = math.ceil(2*(a*np.log10(2) + np.log10(1/delta))/(epsilon**2)) sample_graphlets = sample_graphlets_probabilistic else: raise ValueError('sampling doesn\'t have a valid dictionary format') else: raise TypeError('sampling can either be a dictionary or None') self.sample_graphlets_ = sample_graphlets self.k_ = k self.n_samples_ = n_samples self._initialized["sampling"] = True def transform(self, X): """Calculate the kernel matrix, between given and fitted dataset. Parameters ---------- X : iterable Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that fitting the given graph format). Returns ------- K : numpy array, shape = [n_targets, n_input_graphs] corresponding to the kernel matrix, a calculation between all pairs of graphs between target an features """ self._method_calling = 3 # Check is fit had been called check_is_fitted(self, ['X']) # Input validation and parsing if X is None: raise ValueError('transform input cannot be None') else: Y = self.parse_input(X) # Transform - calculate kernel matrix try: check_is_fitted(self, ['_phi_X']) phi_x = self._phi_X except NotFittedError: phi_x = np.zeros(shape=(self._nx, len(self._graph_bins))) for ((i, j), v) in iteritems(self.X): phi_x[i, j] = v self._phi_X = phi_x phi_y = np.zeros(shape=(self._ny, len(self._graph_bins) + len(self._Y_graph_bins))) for ((i, j), v) in iteritems(Y): phi_y[i, j] = v # store _phi_Y for independent (of normalization arg diagonal-calls) self._phi_Y = phi_y km = np.dot(phi_y[:, :len(self._graph_bins)], phi_x.T) self._is_transformed = True if self.normalize: X_diag, Y_diag = self.diagonal() km /= np.sqrt(np.outer(Y_diag, X_diag)) return km def fit_transform(self, X): """Fit and transform, on the same dataset. Parameters ---------- X : iterable Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that fitting the given graph format). If None the kernel matrix is calculated upon fit data. The test samples. y : None There is no need of a target in a transformer, yet the pipeline API requires this parameter. Returns ------- K : numpy array, shape = [n_input_graphs, n_input_graphs] corresponding to the kernel matrix, a calculation between all pairs of graphs between target an features """ self._method_calling = 2 self.fit(X) # calculate feature matrices. phi_x = np.zeros(shape=(self._nx, len(self._graph_bins))) for ((i, j), v) in iteritems(self.X): phi_x[i, j] = v # Transform - calculate kernel matrix self._phi_X = phi_x km = phi_x.dot(phi_x.T) self._X_diag = np.diagonal(km) if self.normalize: return np.divide(km, np.sqrt(np.outer(self._X_diag, self._X_diag))) else: return km def diagonal(self): """Calculate the kernel matrix diagonal for fitted data. A funtion called on transform on a seperate dataset to apply normalization on the exterior. Parameters ---------- None. Returns ------- X_diag : np.array The diagonal of the kernel matrix, of the fitted data. This consists of kernel calculation for each element with itself. Y_diag : np.array The diagonal of the kernel matrix, of the transformed data. This consists of kernel calculation for each element with itself. """ # Check is fit had been called check_is_fitted(self, ['_phi_X']) try: check_is_fitted(self, ['_X_diag']) except NotFittedError: # Calculate diagonal of X self._X_diag = np.sum(np.square(self._phi_X), axis=1) try: # If transform has happened return Y check_is_fitted(self, ['_phi_Y']) Y_diag = np.sum(np.square(self._phi_Y), axis=1) return self._X_diag, Y_diag except NotFittedError: # Calculate diagonal of X return self._X_diag def parse_input(self, X): """Parse and create features for graphlet_sampling kernel. Parameters ---------- X : iterable For the input to pass the test, we must have: Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that correspond to the given graph format). A valid input also consists of graph type objects. Returns ------- local_values : dict A dictionary of pairs between each input graph and a bins where the sampled graphlets have fallen. """ if not isinstance(X, collections.Iterable): raise TypeError('input must be an iterable\n') else: i = -1 if self._method_calling == 1: self._graph_bins = dict() elif self._method_calling == 3: self._Y_graph_bins = dict() local_values = dict() for (idx, x) in enumerate(iter(X)): is_iter = False if isinstance(x, collections.Iterable): is_iter = True x = list(x) if type(x) is Graph: A = x.get_adjacency_matrix() elif is_iter and len(x) in [0, 1, 2, 3]: if len(x) == 0: warnings.warn('Ignoring empty element on ' + 'index: '+str(idx)) continue else: A = Graph(x[0], {}, {}, self._graph_format).get_adjacency_matrix() else: raise TypeError('each element of X must be either a ' + 'graph or an iterable with at least 1 ' + 'and at most 3 elements\n') A = (A > 0).astype(int) i += 1 # sample graphlets based on the initialized method samples = self.sample_graphlets_(A, self.k_, self.n_samples_, self.random_state_) if self._method_calling == 1: for (j, sg) in enumerate(samples): # add the graph to an isomorphism class if len(self._graph_bins) == 0: self._graph_bins[0] = sg local_values[(i, 0)] = 1 else: newbin = True for k in range(len(self._graph_bins)): if self._graph_bins[k].isomorphic(sg): newbin = False if (i, k) not in local_values: local_values[(i, k)] = 1 local_values[(i, k)] += 1 break if newbin: local_values[(i, len(self._graph_bins))] = 1 self._graph_bins[len(self._graph_bins)] = sg elif self._method_calling == 3: for (j, sg) in enumerate(samples): # add the graph to an isomorphism class newbin = True for k in range(len(self._graph_bins)): if self._graph_bins[k].isomorphic(sg): newbin = False if (i, k) not in local_values: local_values[(i, k)] = 1 local_values[(i, k)] += 1 break if newbin: if len(self._Y_graph_bins) == 0: self._Y_graph_bins[0] = sg local_values[(i, len(self._graph_bins))] = 1 else: newbin_Y = True start = len(self._graph_bins) start_Y = len(self._Y_graph_bins) for l in range(start_Y): if self._Y_graph_bins[l].isomorphic(sg): newbin_Y = False bin_key = (i, l + start) if bin_key not in local_values: local_values[bin_key] = 1 local_values[bin_key] += 1 break if newbin_Y: idx = start + start_Y local_values[(i, idx)] = 1 self._Y_graph_bins[start_Y] = sg if i == -1: raise ValueError('parsed input is empty') if self._method_calling == 1: self._nx = i+1 elif self._method_calling == 3: self._ny = i+1 return local_values
def sample_graphlets_probabilistic(A, k, n_samples, rs): """Propabilistical sampling of n_samples of 3..k sized graphs. Parameters ---------- A : np.array A binary array defining a certain graph. k : int The maximum dimension of the sampled graphlets. n_samples : int Sets the value of randomly drawn random samples, from sizes between 3..k rs : RandomState A RandomState object handling all randomness of the class. Returns ------- graphlets : generator Returns a generator of sampled graphlets (as pynauty graphs), from sizes between 3..k. """ s = list(range(A.shape[0])) min_r, max_r = min(3, A.shape[0]), min(k, A.shape[0]) if min_r == max_r: def rsamp(*args): return min_r else: def rsamp(*args): return rs.randint(min_r, max_r+1) for i in range(n_samples): index_rand = rs.choice(s, rsamp(), replace=False) Q = A[index_rand, :][:, index_rand] yield bGraph(Q.shape[0], zip(*np.where(Q == 1))) def sample_graphlets_all_connected(A, k): """All the connected graphlets of size k of a given graph. The implemented algorithm can be found in :cite:`Karakashian2013AnAF` as `ConSubg`. Parameters ---------- A : np.array A binary array defining a certain graph. k : int The maximum dimension of the sampled graphlets. Returns ------- graphlets : generator Returns a generator of sampled graphlets (as pynauty graphs), of size k. """ G = {i: set(np.where(A[i, :] != 0)[0]) for i in range(A.shape[0])} for s in ConSubg(G, k, np.all(A == A.T)): enum = {j: i for i, j in enumerate(s)} yield bGraph(len(s), iter((enum[i], enum[j]) for i in s for j in s & G[i]))