Source code for grakel.kernels.hadamard_code
"""The hadamard code kernel as defined in :cite:`icpram16`."""
# Author: Ioannis Siglidis <y.siglidis@gmail.com>
# License: BSD 3 clause
import collections
import warnings
import numpy as np
from math import ceil
from numpy import log2
from scipy.linalg import hadamard
from sklearn.exceptions import NotFittedError
from sklearn.utils.validation import check_is_fitted
from sklearn.externals import joblib
from grakel.graph import Graph
from grakel.kernels import Kernel
from grakel.kernels.vertex_histogram import VertexHistogram
# Python 2/3 cross-compatibility import
from six import iteritems
from six import itervalues
[docs]class HadamardCode(Kernel):
"""The simple Hadamard code kernel, as proposed in :cite:`icpram16`.
Parameters
----------
base_graph_kernel : `grakel.kernels.Kernel` or tuple, default=None
If tuple it must consist of a valid kernel object and a
dictionary of parameters. General parameters concerning
normalization, concurrency, .. will be ignored, and the
ones of given on `__init__` will be passed in case it is needed.
Default `base_graph_kernel` is `VertexHistogram`.
rho : int, condition_of_appearance: hc_type=="shortened", default=-1
The size of each single bit arrays. If -1 is chosen r is calculated as
the biggest possible that satisfies an equal division.
L : int, condition_of_appearance: hc_type=="shortened", default=4
The number of bytes to store the bitarray of each label.
n_iter : int, default=5
The number of iterations.
Attributes
----------
base_graph_kernel_ : function
A void function that initializes a base kernel object.
"""
_graph_format = "auto"
[docs] def __init__(self, n_jobs=None, verbose=False,
normalize=False, n_iter=5, base_graph_kernel=None):
"""Initialise a `hadamard_code` kernel."""
super(HadamardCode, self).__init__(
n_jobs=n_jobs, verbose=verbose, normalize=normalize)
self.n_iter = n_iter
self.base_graph_kernel = base_graph_kernel
self._initialized.update({"n_iter": False, "base_graph_kernel": False})
[docs] def initialize(self):
"""Initialize all transformer arguments, needing initialization."""
super(HadamardCode, self).initialize()
if not self._initialized["base_graph_kernel"]:
base_graph_kernel = self.base_graph_kernel
if base_graph_kernel is None:
base_graph_kernel, params = VertexHistogram, dict()
elif type(base_graph_kernel) is type and issubclass(base_graph_kernel, Kernel):
params = dict()
else:
try:
base_graph_kernel, params = base_graph_kernel
except Exception:
raise TypeError('Base kernel was not formulated in '
'the correct way. '
'Check documentation.')
if not (type(base_graph_kernel) is type and
issubclass(base_graph_kernel, Kernel)):
raise TypeError('The first argument must be a valid '
'grakel.kernel.kernel Object')
if type(params) is not dict:
raise ValueError('If the second argument of base '
'kernel exists, it must be a diction'
'ary between parameters names and '
'values')
params.pop("normalize", None)
params["normalize"] = False
params["verbose"] = self.verbose
params["n_jobs"] = None
self.base_graph_kernel_ = (base_graph_kernel, params)
self._initialized["base_graph_kernel"] = True
if not self._initialized["n_iter"]:
if type(self.n_iter) is not int or self.n_iter <= 0:
raise TypeError("'n_iter' must be a positive integer")
self._initialized["n_iter"] = True
[docs] def parse_input(self, X):
"""Parse input and create features, while initializing and/or calculating sub-kernels.
Parameters
----------
X : iterable
For the input to pass the test, we must have:
Each element must be an iterable with at most three features and at
least one. The first that is obligatory is a valid graph structure
(adjacency matrix or edge_dictionary) while the second is
node_labels and the third edge_labels (that correspond to the given
graph format). A valid input also consists of graph type objects.
Returns
-------
base_graph_kernel : object
Returns base_graph_kernel. Only if called from `fit` or `fit_transform`.
K : np.array
Returns the kernel matrix. Only if called from `transform` or
`fit_transform`.
"""
if self.base_graph_kernel_ is None:
raise ValueError('User must provide a base_graph_kernel')
# Input validation and parsing
if not isinstance(X, collections.Iterable):
raise TypeError('input must be an iterable\n')
else:
nx, labels = 0, list()
if self._method_calling in [1, 2]:
nl, labels_enum, base_graph_kernel = 0, dict(), dict()
for kidx in range(self.n_iter):
base_graph_kernel[kidx] = self.base_graph_kernel_[0](**self.base_graph_kernel_[1])
elif self._method_calling == 3:
nl, labels_enum, base_graph_kernel = len(self._labels_enum), dict(self._labels_enum), self.X
inp = list()
neighbors = list()
for (idx, x) in enumerate(iter(X)):
is_iter = False
if isinstance(x, collections.Iterable):
x, is_iter = list(x), True
if is_iter and (len(x) == 0 or len(x) >= 2):
if len(x) == 0:
warnings.warn('Ignoring empty element on index: '
+ str(idx))
continue
else:
if len(x) > 2:
extra = tuple()
if len(x) > 3:
extra = tuple(x[3:])
x = Graph(x[0], x[1], x[2], graph_format=self._graph_format)
extra = (x.get_labels(purpose='any',
label_type="edge", return_none=True), ) + extra
else:
x = Graph(x[0], x[1], {}, graph_format=self._graph_format)
extra = tuple()
elif type(x) is Graph:
el = x.get_labels(purpose=self._graph_format, label_type="edge", return_none=True)
if el is None:
extra = tuple()
else:
extra = (el, )
else:
raise TypeError('each element of X must be either a ' +
'graph object or a list with at least ' +
'a graph like object and node labels ' +
'dict \n')
label = x.get_labels(purpose='any')
inp.append((x.get_graph_object(), extra))
neighbors.append(x.get_edge_dictionary())
labels.append(label)
for v in set(itervalues(label)):
if v not in labels_enum:
labels_enum[v] = nl
nl += 1
nx += 1
if nx == 0:
raise ValueError('parsed input is empty')
# Calculate the hadamard matrix
H = hadamard(int(2**(ceil(log2(nl)))))
def generate_graphs(labels):
# Intial labeling of vertices based on their corresponding Hadamard code (i-th row of the
# Hadamard matrix) where i is the i-th label on enumeration
new_graphs, new_labels = list(), list()
for ((obj, extra), label) in zip(inp, labels):
new_label = dict()
for (k, v) in iteritems(label):
new_label[k] = H[labels_enum[v], :]
new_graphs.append((obj, {i: tuple(j) for (i, j) in iteritems(new_label)}) + extra)
new_labels.append(new_label)
yield new_graphs
# Main
for i in range(1, self.n_iter):
new_graphs, labels, new_labels = list(), new_labels, list()
for ((obj, extra), neighbor, old_label) in zip(inp, neighbors, labels):
# Find unique labels and sort them for both graphs and keep for each node
# the temporary
new_label = dict()
for (k, ns) in iteritems(neighbor):
new_label[k] = old_label[k]
for q in ns:
new_label[k] = np.add(new_label[k], old_label[q])
new_labels.append(new_label)
new_graphs.append((obj, {i: tuple(j) for (i, j) in iteritems(new_label)}) +
extra)
yield new_graphs
if self._method_calling in [1, 2]:
base_graph_kernel = {i: self.base_graph_kernel_[0](**self.base_graph_kernel_[1]) for i in range(self.n_iter)}
if self._parallel is None:
# Add the zero iteration element
if self._method_calling == 1:
for (i, g) in enumerate(generate_graphs(labels)):
base_graph_kernel[i].fit(g)
elif self._method_calling == 2:
K = np.sum((base_graph_kernel[i].fit_transform(g) for (i, g)
in enumerate(generate_graphs(labels))), axis=0)
elif self._method_calling == 3:
# Calculate the kernel matrix without parallelization
K = np.sum((self.X[i].transform(g) for (i, g)
in enumerate(generate_graphs(labels))), axis=0)
else:
if self._method_calling == 1:
self._parallel(joblib.delayed(efit)(base_graph_kernel[i], g)
for (i, g) in enumerate(generate_graphs(labels)))
elif self._method_calling == 2:
# Calculate the kernel marix with parallelization
K = np.sum(self._parallel(joblib.delayed(efit_transform)(base_graph_kernel[i], g) for (i, g)
in enumerate(generate_graphs(labels))), axis=0)
elif self._method_calling == 3:
# Calculate the kernel marix with parallelization
K = np.sum(self._parallel(joblib.delayed(etransform)(self.X[i], g) for (i, g)
in enumerate(generate_graphs(labels))), axis=0)
if self._method_calling == 1:
self._labels_enum = labels_enum
return base_graph_kernel
elif self._method_calling == 2:
self._labels_enum = labels_enum
return K, base_graph_kernel
elif self._method_calling == 3:
return K
[docs] def transform(self, X):
"""Calculate the kernel matrix, between given and fitted dataset.
Parameters
----------
X : iterable
Each element must be an iterable with at most three features and at
least one. The first that is obligatory is a valid graph structure
(adjacency matrix or edge_dictionary) while the second is
node_labels and the third edge_labels (that fitting the given graph
format). If None the kernel matrix is calculated upon fit data.
The test samples.
Returns
-------
K : numpy array, shape = [n_targets, n_input_graphs]
corresponding to the kernel matrix, a calculation between
all pairs of graphs between target an features
"""
self._method_calling = 3
# Check is fit had been called
check_is_fitted(self, ['X'])
# Input validation and parsing
if X is None:
raise ValueError('transform input cannot be None')
else:
km = self.parse_input(X)
self._is_transformed = True
if self.normalize:
X_diag, Y_diag = self.diagonal()
old_settings = np.seterr(divide='ignore')
km /= np.sqrt(np.outer(Y_diag, X_diag))
km = np.nan_to_num(km)
np.seterr(**old_settings)
return km
[docs] def fit_transform(self, X, y=None):
"""Fit and transform, on the same dataset.
Parameters
----------
X : iterable
Each element must be an iterable with at most three features and at
least one. The first that is obligatory is a valid graph structure
(adjacency matrix or edge_dictionary) while the second is
node_labels and the third edge_labels (that fitting the given graph
format). If None the kernel matrix is calculated upon fit data.
The test samples.
y : None
There is no need of a target in a transformer, yet the pipeline API
requires this parameter.
Returns
-------
K : numpy array, shape = [n_targets, n_input_graphs]
corresponding to the kernel matrix, a calculation between
all pairs of graphs between target an features
"""
self._method_calling = 2
self._is_transformed = False
self.initialize()
if X is None:
raise ValueError('transform input cannot be None')
else:
km, self.X = self.parse_input(X)
self._X_diag = np.diagonal(km)
if self.normalize:
old_settings = np.seterr(divide='ignore')
km = np.nan_to_num(np.divide(km, np.sqrt(np.outer(self._X_diag, self._X_diag))))
np.seterr(**old_settings)
return km
[docs] def diagonal(self):
"""Calculate the kernel matrix diagonal for fitted data.
A funtion called on transform on a seperate dataset to apply
normalization on the exterior.
Parameters
----------
None.
Returns
-------
X_diag : np.array
The diagonal of the kernel matrix, of the fitted data.
This consists of kernel calculation for each element with itself.
Y_diag : np.array
The diagonal of the kernel matrix, of the transformed data.
This consists of kernel calculation for each element with itself.
"""
# Check if fit had been called
check_is_fitted(self, ['X'])
try:
check_is_fitted(self, ['_X_diag'])
if self._is_transformed:
Y_diag = self.X[0].diagonal()[1]
for i in range(1, self.n_iter):
Y_diag += self.X[i].diagonal()[1]
except NotFittedError:
# Calculate diagonal of X
if self._is_transformed:
X_diag, Y_diag = self.X[0].diagonal()
# X_diag is considered a mutable and should not affect the kernel matrix itself.
X_diag.flags.writeable = True
for i in range(1, self.n_iter):
x, y = self.X[i].diagonal()
X_diag += x
Y_diag += y
self._X_diag = X_diag
else:
# case sub kernel is only fitted
X_diag = self.X[0].diagonal()
# X_diag is considered a mutable and should not affect the kernel matrix itself.
X_diag.flags.writeable = True
for i in range(1, self.n_iter):
x = self.X[i].diagonal()
X_diag += x
self._X_diag = X_diag
if self._is_transformed:
return self._X_diag, Y_diag
else:
return self._X_diag
def efit(object, data):
"""Fit an object on data."""
object.fit(data)
def efit_transform(object, data):
"""Fit-Transform an object on data."""
return object.fit_transform(data)
def etransform(object, data):
"""Transform an object on data."""
return object.transform(data)