Source code for grakel.kernels.weisfeiler_lehman_optimal_assignment

"""The weisfeiler lehman optimal assignment kernel :cite:`kriege2016valid`."""
# Author: Giannis Nikolentzos <nikolentzos@lix.polytechnique.fr>
# License: BSD 3 clause
import collections
import warnings

import numpy as np

from sklearn.exceptions import NotFittedError
from sklearn.utils.validation import check_is_fitted

from grakel.graph import Graph
from grakel.kernels import Kernel

from scipy.sparse import lil_matrix

# Python 2/3 cross-compatibility import
from six import iteritems
from six import itervalues


[docs]class WeisfeilerLehmanOptimalAssignment(Kernel): """Compute the Weisfeiler Lehman Optimal Assignment Kernel. See :cite:`kriege2016valid`. Parameters ---------- n_iter : int, default=5 The number of iterations. Attributes ---------- X : dict Holds a list of fitted subkernel modules. sparse : bool Defines if the data will be stored in a sparse format. Sparse format is slower, but less memory consuming and in some cases the only solution. _nx : number Holds the number of inputs. _n_iter : int Holds the number, of iterations. _hierarchy : dict A hierarchy produced by the WL relabeling procedure. _inv_labels : dict An inverse dictionary, used for relabeling on each iteration. """ _graph_format = "dictionary"
[docs] def __init__(self, n_jobs=None, verbose=False, normalize=False, n_iter=5, sparse=False): """Initialise a `weisfeiler_lehman` kernel.""" super(WeisfeilerLehmanOptimalAssignment, self).__init__( n_jobs=n_jobs, verbose=verbose, normalize=normalize) self.n_iter = n_iter self.sparse = sparse self._initialized.update({"n_iter": False, 'sparse': True})
def initialize(self): """Initialize all transformer arguments, needing initialization.""" super(WeisfeilerLehmanOptimalAssignment, self).initialize() if not self._initialized["n_iter"]: if type(self.n_iter) is not int or self.n_iter <= 0: raise TypeError("'n_iter' must be a positive integer") self._n_iter = self.n_iter + 1 self._initialized["n_iter"] = True if not self._initialized["sparse"]: if self.sparse not in [False, True]: TypeError('sparse could be False, True') self._initialized["sparse"] = False def parse_input(self, X): """Parse input for weisfeiler lehman optimal assignment. Parameters ---------- X : iterable For the input to pass the test, we must have: Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that correspond to the given graph format). A valid input also consists of graph type objects. Returns ------- Hs : numpy array, shape = [n_input_graphs, hierarchy_size] An array where the rows contain the histograms of the graphs. """ if self._method_calling not in [1, 2]: raise ValueError('method call must be called either from fit ' + 'or fit-transform') elif hasattr(self, '_X_diag'): # Clean _X_diag value delattr(self, '_X_diag') # Input validation and parsing if not isinstance(X, collections.Iterable): raise TypeError('input must be an iterable\n') else: nx = 0 Gs_ed, L, distinct_values = dict(), dict(), set() for (idx, x) in enumerate(iter(X)): is_iter = isinstance(x, collections.Iterable) if is_iter: x = list(x) if is_iter and (len(x) == 0 or len(x) >= 2): if len(x) == 0: warnings.warn('Ignoring empty element on index: ' + str(idx)) continue else: if len(x) > 2: extra = tuple() if len(x) > 3: extra = tuple(x[3:]) x = Graph(x[0], x[1], x[2], graph_format=self._graph_format) extra = (x.get_labels(purpose=self._graph_format, label_type="edge", return_none=True), ) + extra else: x = Graph(x[0], x[1], {}, graph_format=self._graph_format) extra = tuple() elif type(x) is Graph: x.desired_format(self._graph_format) else: raise TypeError('each element of X must be either a ' + 'graph object or a list with at least ' + 'a graph like object and node labels ' + 'dict \n') Gs_ed[nx] = x.get_edge_dictionary() L[nx] = x.get_labels(purpose="dictionary") distinct_values |= set(itervalues(L[nx])) nx += 1 if nx == 0: raise ValueError('parsed input is empty') # Save the number of "fitted" graphs. self._nx = nx # Initialize hierarchy self._hierarchy = dict() self._hierarchy['root'] = dict() self._hierarchy['root']['parent'] = None self._hierarchy['root']['children'] = list() self._hierarchy['root']['w'] = 0 self._hierarchy['root']['omega'] = 0 # get all the distinct values of current labels WL_labels_inverse = dict() # assign a number to each label label_count = 0 for dv in sorted(list(distinct_values)): WL_labels_inverse[dv] = label_count self._insert_into_hierarchy(label_count, 'root') label_count += 1 # Initalize an inverse dictionary of labels for all iterations self._inv_labels = dict() self._inv_labels[0] = WL_labels_inverse for j in range(nx): new_labels = dict() for k in L[j].keys(): new_labels[k] = WL_labels_inverse[L[j][k]] L[j] = new_labels for i in range(1, self._n_iter): new_previous_label_set, WL_labels_inverse, L_temp = set(), dict(), dict() for j in range(nx): # Find unique labels and sort # them for both graphs # Keep for each node the temporary L_temp[j] = dict() for v in Gs_ed[j].keys(): credential = str(L[j][v]) + "," + \ str(sorted([L[j][n] for n in Gs_ed[j][v].keys()])) L_temp[j][v] = credential new_previous_label_set.add((credential, L[j][v])) label_list = sorted(list(new_previous_label_set), key=lambda tup: tup[0]) for dv, previous_label in label_list: WL_labels_inverse[dv] = label_count self._insert_into_hierarchy(label_count, previous_label) label_count += 1 # Recalculate labels for j in range(nx): new_labels = dict() for k in L_temp[j].keys(): new_labels[k] = WL_labels_inverse[L_temp[j][k]] L[j] = new_labels self._inv_labels[i] = WL_labels_inverse # Compute the vector representation of each graph if self.sparse: Hs = lil_matrix((nx, len(self._hierarchy))) else: Hs = np.zeros((nx, len(self._hierarchy))) for j in range(nx): for k in L[j].keys(): current_label = L[j][k] while self._hierarchy[current_label]['parent'] is not None: Hs[j, current_label] += self._hierarchy[current_label]['omega'] current_label = self._hierarchy[current_label]['parent'] return Hs def _insert_into_hierarchy(self, label, previous_label): """Inserts a label into the hierarchy. Parameters ---------- label : int The label to insert into the hierarchy. previous_label : int The previous label of the node. """ self._hierarchy[label] = dict() self._hierarchy[label]['parent'] = previous_label self._hierarchy[label]['children'] = list() self._hierarchy[label]['w'] = self._hierarchy[previous_label]['w'] + 1 self._hierarchy[label]['omega'] = 1 self._hierarchy[previous_label]['children'].append(label) def fit_transform(self, X, y=None): """Fit and transform, on the same dataset. Parameters ---------- X : iterable Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that fitting the given graph format). If None the kernel matrix is calculated upon fit data. The test samples. y : Object, default=None Ignored argument, added for the pipeline. Returns ------- K : numpy array, shape = [n_targets, n_input_graphs] corresponding to the kernel matrix, a calculation between all pairs of graphs between target an features """ self._method_calling = 2 self._is_transformed = False self.initialize() if X is None: raise ValueError('transform input cannot be None') else: self.X = self.parse_input(X) # Compute the histogram intersection kernel K = np.zeros((self._nx, self._nx)) if self.sparse: for i in range(self._nx): for j in range(i, self._nx): K[i, j] = np.sum(self.X[i, :].minimum(self.X[j, :])) K[j, i] = K[i, j] else: for i in range(self._nx): for j in range(i, self._nx): K[i, j] = np.sum(np.min(self.X[np.ix_([i, j]), :], axis=1)) K[j, i] = K[i, j] self._X_diag = np.diagonal(K) if self.normalize: old_settings = np.seterr(divide='ignore') K = np.nan_to_num(np.divide(K, np.sqrt(np.outer(self._X_diag, self._X_diag)))) np.seterr(**old_settings) return K def transform(self, X): """Calculate the kernel matrix, between given and fitted dataset. Parameters ---------- X : iterable Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that fitting the given graph format). If None the kernel matrix is calculated upon fit data. The test samples. Returns ------- K : numpy array, shape = [n_targets, n_input_graphs] corresponding to the kernel matrix, a calculation between all pairs of graphs between target an features """ self._method_calling = 3 # Check is fit had been called check_is_fitted(self, ['X', '_nx', '_hierarchy', '_inv_labels']) # Input validation and parsing if X is None: raise ValueError('transform input cannot be None') else: if not isinstance(X, collections.Iterable): raise ValueError('input must be an iterable\n') else: nx = 0 distinct_values = set() Gs_ed, L = dict(), dict() for (i, x) in enumerate(iter(X)): is_iter = isinstance(x, collections.Iterable) if is_iter: x = list(x) if is_iter and len(x) in [0, 2, 3]: if len(x) == 0: warnings.warn('Ignoring empty element on index: ' + str(i)) continue elif len(x) in [2, 3]: x = Graph(x[0], x[1], {}, self._graph_format) elif type(x) is Graph: x.desired_format("dictionary") else: raise ValueError('each element of X must have at ' + 'least one and at most 3 elements\n') Gs_ed[nx] = x.get_edge_dictionary() L[nx] = x.get_labels(purpose="dictionary") # Hold all the distinct values distinct_values |= set( v for v in itervalues(L[nx]) if v not in self._inv_labels[0]) nx += 1 if nx == 0: raise ValueError('parsed input is empty') # get all the distinct values of new labels WL_labels_inverse = dict() # assign a number to each label label_count = sum([len(self._inv_labels[i]) for i in range(len(self._inv_labels))]) for dv in sorted(list(distinct_values)): WL_labels_inverse[dv] = label_count self._insert_into_hierarchy(label_count, 'root') label_count += 1 for j in range(nx): new_labels = dict() for (k, v) in iteritems(L[j]): if v in self._inv_labels[0]: new_labels[k] = self._inv_labels[0][v] else: new_labels[k] = WL_labels_inverse[v] L[j] = new_labels for i in range(1, self._n_iter): L_temp, new_previous_label_set = dict(), set() for j in range(nx): # Find unique labels and sort them for both graphs # Keep for each node the temporary L_temp[j] = dict() for v in Gs_ed[j].keys(): credential = str(L[j][v]) + "," + \ str(sorted([L[j][n] for n in Gs_ed[j][v].keys()])) L_temp[j][v] = credential if credential not in self._inv_labels[i]: new_previous_label_set.add((credential, L[j][v])) # Calculate the new label_set WL_labels_inverse = dict() if len(new_previous_label_set) > 0: for dv, previous_label in sorted(list(new_previous_label_set), key=lambda tup: tup[0]): WL_labels_inverse[dv] = label_count self._insert_into_hierarchy(label_count, previous_label) label_count += 1 # Recalculate labels for j in range(nx): new_labels = dict() for (k, v) in iteritems(L_temp[j]): if v in self._inv_labels[i]: new_labels[k] = self._inv_labels[i][v] else: new_labels[k] = WL_labels_inverse[v] L[j] = new_labels # Compute the vector representation of each graph if self.sparse: Hs = lil_matrix((nx, len(self._hierarchy))) else: Hs = np.zeros((nx, len(self._hierarchy))) for j in range(nx): for k in L[j].keys(): current_label = L[j][k] while self._hierarchy[current_label]['parent'] is not None: Hs[j, current_label] += self._hierarchy[current_label]['omega'] current_label = self._hierarchy[current_label]['parent'] self.Y = Hs # Compute the histogram intersection kernel K = np.zeros((nx, self._nx)) if self.sparse: for i in range(self._nx): for j in range(i, self._nx): K[i, j] = np.sum(Hs[i, :self.X.shape[1]].minimum(self.X[j, :])) else: for i in range(nx): for j in range(self._nx): K[i, j] = np.sum(np.min([Hs[i, :self.X.shape[1]], self.X[j, :]], axis=0)) self._is_transformed = True if self.normalize: X_diag, Y_diag = self.diagonal() old_settings = np.seterr(divide='ignore') K = np.nan_to_num(np.divide(K, np.sqrt(np.outer(Y_diag, X_diag)))) np.seterr(**old_settings) return K def diagonal(self): """Calculate the kernel matrix diagonal for fitted data. A funtion called on transform on a seperate dataset to apply normalization on the exterior. Parameters ---------- None. Returns ------- X_diag : np.array The diagonal of the kernel matrix, of the fitted data. This consists of kernel calculation for each element with itself. Y_diag : np.array The diagonal of the kernel matrix, of the transformed data. This consists of kernel calculation for each element with itself. """ # Check if fit had been called check_is_fitted(self, ['X']) try: check_is_fitted(self, ['_X_diag']) if self._is_transformed: Y_diag = np.zeros(self.Y.shape[0]) for i in range(self.Y.shape[0]): Y_diag[i] = np.sum(np.min(self.Y[np.ix_([i, i]), :], axis=1)) except NotFittedError: # Calculate diagonal of X if self._is_transformed: self._X_diag = np.zeros(self.X.shape[0]) for i in range(self.X.shape[0]): self._X_diag[i] = np.sum(np.min(self.X[np.ix_([i, i]), :], axis=1)) Y_diag = np.zeros(self.Y.shape[0]) for i in range(self.Y.shape[0]): Y_diag[i] = np.sum(np.min(self.Y[np.ix_([i, i]), :], axis=1)) else: # case sub kernel is only fitted self._X_diag = np.zeros(self.X.shape[0]) for i in range(self.X.shape[0]): self._X_diag[i] = np.sum(np.min(self.X[np.ix_([i, i]), :], axis=1)) if self._is_transformed: return self._X_diag, Y_diag else: return self._X_diag
def efit(object, data): """Fit an object on data.""" object.fit(data) def efit_transform(object, data): """Fit-Transform an object on data.""" return object.fit_transform(data) def etransform(object, data): """Transform an object on data.""" return object.transform(data)