Source code for grakel.kernels.hadamard_code

"""The hadamard code kernel as defined in :cite:`icpram16`."""
# Author: Ioannis Siglidis <y.siglidis@gmail.com>
# License: BSD 3 clause
import collections
import warnings

import numpy as np

from math import ceil
from numpy import log2

from scipy.linalg import hadamard

from sklearn.exceptions import NotFittedError
from sklearn.utils.validation import check_is_fitted
from sklearn.externals import joblib

from grakel.graph import Graph
from grakel.kernels import Kernel
from grakel.kernels.vertex_histogram import VertexHistogram

# Python 2/3 cross-compatibility import
from six import iteritems
from six import itervalues


[docs]class HadamardCode(Kernel): """The simple Hadamard code kernel, as proposed in :cite:`icpram16`. Parameters ---------- base_graph_kernel : `grakel.kernels.Kernel` or tuple, default=None If tuple it must consist of a valid kernel object and a dictionary of parameters. General parameters concerning normalization, concurrency, .. will be ignored, and the ones of given on `__init__` will be passed in case it is needed. Default `base_graph_kernel` is `VertexHistogram`. rho : int, condition_of_appearance: hc_type=="shortened", default=-1 The size of each single bit arrays. If -1 is chosen r is calculated as the biggest possible that satisfies an equal division. L : int, condition_of_appearance: hc_type=="shortened", default=4 The number of bytes to store the bitarray of each label. n_iter : int, default=5 The number of iterations. Attributes ---------- base_graph_kernel_ : function A void function that initializes a base kernel object. """ _graph_format = "auto"
[docs] def __init__(self, n_jobs=None, verbose=False, normalize=False, n_iter=5, base_graph_kernel=None): """Initialise a `hadamard_code` kernel.""" super(HadamardCode, self).__init__( n_jobs=n_jobs, verbose=verbose, normalize=normalize) self.n_iter = n_iter self.base_graph_kernel = base_graph_kernel self._initialized.update({"n_iter": False, "base_graph_kernel": False})
[docs] def initialize(self): """Initialize all transformer arguments, needing initialization.""" super(HadamardCode, self).initialize() if not self._initialized["base_graph_kernel"]: base_graph_kernel = self.base_graph_kernel if base_graph_kernel is None: base_graph_kernel, params = VertexHistogram, dict() elif type(base_graph_kernel) is type and issubclass(base_graph_kernel, Kernel): params = dict() else: try: base_graph_kernel, params = base_graph_kernel except Exception: raise TypeError('Base kernel was not formulated in ' 'the correct way. ' 'Check documentation.') if not (type(base_graph_kernel) is type and issubclass(base_graph_kernel, Kernel)): raise TypeError('The first argument must be a valid ' 'grakel.kernel.kernel Object') if type(params) is not dict: raise ValueError('If the second argument of base ' 'kernel exists, it must be a diction' 'ary between parameters names and ' 'values') params.pop("normalize", None) params["normalize"] = False params["verbose"] = self.verbose params["n_jobs"] = None self.base_graph_kernel_ = (base_graph_kernel, params) self._initialized["base_graph_kernel"] = True if not self._initialized["n_iter"]: if type(self.n_iter) is not int or self.n_iter <= 0: raise TypeError("'n_iter' must be a positive integer") self._initialized["n_iter"] = True
[docs] def parse_input(self, X): """Parse input and create features, while initializing and/or calculating sub-kernels. Parameters ---------- X : iterable For the input to pass the test, we must have: Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that correspond to the given graph format). A valid input also consists of graph type objects. Returns ------- base_graph_kernel : object Returns base_graph_kernel. Only if called from `fit` or `fit_transform`. K : np.array Returns the kernel matrix. Only if called from `transform` or `fit_transform`. """ if self.base_graph_kernel_ is None: raise ValueError('User must provide a base_graph_kernel') # Input validation and parsing if not isinstance(X, collections.Iterable): raise TypeError('input must be an iterable\n') else: nx, labels = 0, list() if self._method_calling in [1, 2]: nl, labels_enum, base_graph_kernel = 0, dict(), dict() for kidx in range(self.n_iter): base_graph_kernel[kidx] = self.base_graph_kernel_[0](**self.base_graph_kernel_[1]) elif self._method_calling == 3: nl, labels_enum, base_graph_kernel = len(self._labels_enum), dict(self._labels_enum), self.X inp = list() neighbors = list() for (idx, x) in enumerate(iter(X)): is_iter = False if isinstance(x, collections.Iterable): x, is_iter = list(x), True if is_iter and (len(x) == 0 or len(x) >= 2): if len(x) == 0: warnings.warn('Ignoring empty element on index: ' + str(idx)) continue else: if len(x) > 2: extra = tuple() if len(x) > 3: extra = tuple(x[3:]) x = Graph(x[0], x[1], x[2], graph_format=self._graph_format) extra = (x.get_labels(purpose='any', label_type="edge", return_none=True), ) + extra else: x = Graph(x[0], x[1], {}, graph_format=self._graph_format) extra = tuple() elif type(x) is Graph: el = x.get_labels(purpose=self._graph_format, label_type="edge", return_none=True) if el is None: extra = tuple() else: extra = (el, ) else: raise TypeError('each element of X must be either a ' + 'graph object or a list with at least ' + 'a graph like object and node labels ' + 'dict \n') label = x.get_labels(purpose='any') inp.append((x.get_graph_object(), extra)) neighbors.append(x.get_edge_dictionary()) labels.append(label) for v in set(itervalues(label)): if v not in labels_enum: labels_enum[v] = nl nl += 1 nx += 1 if nx == 0: raise ValueError('parsed input is empty') # Calculate the hadamard matrix H = hadamard(int(2**(ceil(log2(nl))))) def generate_graphs(labels): # Intial labeling of vertices based on their corresponding Hadamard code (i-th row of the # Hadamard matrix) where i is the i-th label on enumeration new_graphs, new_labels = list(), list() for ((obj, extra), label) in zip(inp, labels): new_label = dict() for (k, v) in iteritems(label): new_label[k] = H[labels_enum[v], :] new_graphs.append((obj, {i: tuple(j) for (i, j) in iteritems(new_label)}) + extra) new_labels.append(new_label) yield new_graphs # Main for i in range(1, self.n_iter): new_graphs, labels, new_labels = list(), new_labels, list() for ((obj, extra), neighbor, old_label) in zip(inp, neighbors, labels): # Find unique labels and sort them for both graphs and keep for each node # the temporary new_label = dict() for (k, ns) in iteritems(neighbor): new_label[k] = old_label[k] for q in ns: new_label[k] = np.add(new_label[k], old_label[q]) new_labels.append(new_label) new_graphs.append((obj, {i: tuple(j) for (i, j) in iteritems(new_label)}) + extra) yield new_graphs if self._method_calling in [1, 2]: base_graph_kernel = {i: self.base_graph_kernel_[0](**self.base_graph_kernel_[1]) for i in range(self.n_iter)} if self._parallel is None: # Add the zero iteration element if self._method_calling == 1: for (i, g) in enumerate(generate_graphs(labels)): base_graph_kernel[i].fit(g) elif self._method_calling == 2: K = np.sum((base_graph_kernel[i].fit_transform(g) for (i, g) in enumerate(generate_graphs(labels))), axis=0) elif self._method_calling == 3: # Calculate the kernel matrix without parallelization K = np.sum((self.X[i].transform(g) for (i, g) in enumerate(generate_graphs(labels))), axis=0) else: if self._method_calling == 1: self._parallel(joblib.delayed(efit)(base_graph_kernel[i], g) for (i, g) in enumerate(generate_graphs(labels))) elif self._method_calling == 2: # Calculate the kernel marix with parallelization K = np.sum(self._parallel(joblib.delayed(efit_transform)(base_graph_kernel[i], g) for (i, g) in enumerate(generate_graphs(labels))), axis=0) elif self._method_calling == 3: # Calculate the kernel marix with parallelization K = np.sum(self._parallel(joblib.delayed(etransform)(self.X[i], g) for (i, g) in enumerate(generate_graphs(labels))), axis=0) if self._method_calling == 1: self._labels_enum = labels_enum return base_graph_kernel elif self._method_calling == 2: self._labels_enum = labels_enum return K, base_graph_kernel elif self._method_calling == 3: return K
[docs] def transform(self, X): """Calculate the kernel matrix, between given and fitted dataset. Parameters ---------- X : iterable Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that fitting the given graph format). If None the kernel matrix is calculated upon fit data. The test samples. Returns ------- K : numpy array, shape = [n_targets, n_input_graphs] corresponding to the kernel matrix, a calculation between all pairs of graphs between target an features """ self._method_calling = 3 # Check is fit had been called check_is_fitted(self, ['X']) # Input validation and parsing if X is None: raise ValueError('transform input cannot be None') else: km = self.parse_input(X) self._is_transformed = True if self.normalize: X_diag, Y_diag = self.diagonal() old_settings = np.seterr(divide='ignore') km /= np.sqrt(np.outer(Y_diag, X_diag)) km = np.nan_to_num(km) np.seterr(**old_settings) return km
[docs] def fit_transform(self, X, y=None): """Fit and transform, on the same dataset. Parameters ---------- X : iterable Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that fitting the given graph format). If None the kernel matrix is calculated upon fit data. The test samples. y : None There is no need of a target in a transformer, yet the pipeline API requires this parameter. Returns ------- K : numpy array, shape = [n_targets, n_input_graphs] corresponding to the kernel matrix, a calculation between all pairs of graphs between target an features """ self._method_calling = 2 self._is_transformed = False self.initialize() if X is None: raise ValueError('transform input cannot be None') else: km, self.X = self.parse_input(X) self._X_diag = np.diagonal(km) if self.normalize: old_settings = np.seterr(divide='ignore') km = np.nan_to_num(np.divide(km, np.sqrt(np.outer(self._X_diag, self._X_diag)))) np.seterr(**old_settings) return km
[docs] def diagonal(self): """Calculate the kernel matrix diagonal for fitted data. A funtion called on transform on a seperate dataset to apply normalization on the exterior. Parameters ---------- None. Returns ------- X_diag : np.array The diagonal of the kernel matrix, of the fitted data. This consists of kernel calculation for each element with itself. Y_diag : np.array The diagonal of the kernel matrix, of the transformed data. This consists of kernel calculation for each element with itself. """ # Check if fit had been called check_is_fitted(self, ['X']) try: check_is_fitted(self, ['_X_diag']) if self._is_transformed: Y_diag = self.X[0].diagonal()[1] for i in range(1, self.n_iter): Y_diag += self.X[i].diagonal()[1] except NotFittedError: # Calculate diagonal of X if self._is_transformed: X_diag, Y_diag = self.X[0].diagonal() # X_diag is considered a mutable and should not affect the kernel matrix itself. X_diag.flags.writeable = True for i in range(1, self.n_iter): x, y = self.X[i].diagonal() X_diag += x Y_diag += y self._X_diag = X_diag else: # case sub kernel is only fitted X_diag = self.X[0].diagonal() # X_diag is considered a mutable and should not affect the kernel matrix itself. X_diag.flags.writeable = True for i in range(1, self.n_iter): x = self.X[i].diagonal() X_diag += x self._X_diag = X_diag if self._is_transformed: return self._X_diag, Y_diag else: return self._X_diag
def efit(object, data): """Fit an object on data.""" object.fit(data) def efit_transform(object, data): """Fit-Transform an object on data.""" return object.fit_transform(data) def etransform(object, data): """Transform an object on data.""" return object.transform(data)