Source code for grakel.kernels.neighborhood_hash

"""The neighborhood hashing kernel as defined in :cite:`Hido2009ALG`."""
# Author: Ioannis Siglidis <y.siglidis@gmail.com>
# License: BSD 3 clause
import collections
import warnings

from grakel.graph import Graph
from grakel.kernels import Kernel

from sklearn.utils import check_random_state
from sklearn.utils.validation import check_is_fitted

# Python 2/3 cross-compatibility import
from six import itervalues
from six import iteritems


[docs]class NeighborhoodHash(Kernel): """Neighborhood hashing kernel as proposed in :cite:`Hido2009ALG`. Parameters ---------- R : int, default=3 The maximum number of neighborhood hash. nh_type : str, valid_types={"simple", "count_sensitive"}, default="simple" The existing neighborhood hash type as defined in :cite:`Hido2009ALG`. bytes : int, default=2 Byte size of hashes. random_state : RandomState or int, default=None A random number generator instance or an int to initialize a RandomState as a seed. Attributes ---------- NH_ : function The neighborhood hashing function. random_state_ : RandomState A RandomState object handling all randomness of the class. """
[docs] def __init__(self, n_jobs=None, normalize=False, verbose=False, random_state=None, R=3, nh_type='simple', bits=8): """Initialize a `neighborhood_hash` kernel.""" super(NeighborhoodHash, self).__init__(n_jobs=n_jobs, normalize=normalize, verbose=False) self.random_state = random_state self.R = R self.nh_type = nh_type self.bits = bits self._initialized.update({"random_state": False, "R": False, "nh_type": False, "bits": False})
[docs] def initialize(self): """Initialize all transformer arguments, needing initialization.""" super(NeighborhoodHash, self).initialize() if not self._initialized["random_state"]: self.random_state_ = check_random_state(self.random_state) self._initialized["random_state"] = True if not self._initialized["R"]: if type(self.R) is not int or self.R <= 0: raise TypeError('R must be an intger bigger than zero') self._initialized["R"] = True if not self._initialized["nh_type"]: if self.nh_type == 'simple': self.NH_ = self.neighborhood_hash_simple elif self.nh_type == 'count_sensitive': self.NH_ = self.neighborhood_hash_count_sensitive else: raise TypeError('unrecognised neighborhood hashing type') self._initialized["nh_type"] = True if not self._initialized["bits"]: if type(self.bits) is not int or self.bits <= 0: raise TypeError('illegal number of bits for hashing') self._max_number = 1 << self.bits self._mask = self._max_number-1 self._initialized["bits"] = True
[docs] def fit(self, X, y=None): """Fit a dataset, for a transformer. Parameters ---------- X : iterable Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that fitting the given graph format). The train samples. y : None There is no need of a target in a transformer, yet the pipeline API requires this parameter. Returns ------- self : object Returns self. """ self._method_calling = 1 self._is_transformed = False # Input validation and parsing self.initialize() if X is None: raise ValueError('`fit` input cannot be None') else: if not isinstance(X, collections.Iterable): raise TypeError('input must be an iterable\n') i = 0 out = list() gs = list() self._labels_hash_dict, labels_hash_set = dict(), set() for (idx, x) in enumerate(iter(X)): is_iter = isinstance(x, collections.Iterable) if is_iter: x = list(x) if is_iter and len(x) in [0, 1, 2, 3]: if len(x) == 0: warnings.warn('Ignoring empty element on index: ' + str(idx)) continue elif len(x) == 1: warnings.warn( 'Ignoring empty element on index: ' + str(i) + '\nLabels must be provided.') else: x = Graph(x[0], x[1], {}, self._graph_format) vertices = list(x.get_vertices(purpose="any")) Labels = x.get_labels(purpose="any") elif type(x) is Graph: vertices = list(x.get_vertices(purpose="any")) Labels = x.get_labels(purpose="any") else: raise TypeError('each element of X must be either ' 'a graph object or a list with at ' 'least a graph like object and ' 'node labels dict \n') g = (vertices, Labels, {n: x.neighbors(n, purpose="any") for n in vertices}) # collect all the labels labels_hash_set |= set(itervalues(Labels)) gs.append(g) i += 1 if i == 0: raise ValueError('parsed input is empty') # Hash labels if len(labels_hash_set) > self._max_number: warnings.warn('Number of labels is smaller than' 'the biggest possible.. ' 'Collisions will appear on the ' 'new labels.') # If labels exceed the biggest possible size nl, nrl = list(), len(labels_hash_set) while nrl > self._max_number: nl += self.random_state_.choice(self._max_number, self._max_number, replace=False).tolist() nrl -= self._max_number if nrl > 0: nl += self.random_state_.choice(self._max_number, nrl, replace=False).tolist() # unify the collisions per element. else: # else draw n random numbers. nl = self.random_state_.choice(self._max_number, len(labels_hash_set), replace=False).tolist() self._labels_hash_dict = dict(zip(labels_hash_set, nl)) # for all graphs for vertices, labels, neighbors in gs: new_labels = {v: self._labels_hash_dict[l] for v, l in iteritems(labels)} g = (vertices, new_labels, neighbors,) gr = {0: self.NH_(g)} for r in range(1, self.R): gr[r] = self.NH_(gr[r-1]) # save the output for all levels out.append(gr) self.X = out # Return the transformer return self
[docs] def fit_transform(self, X): """Fit and transform, on the same dataset. Parameters ---------- X : iterable Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that fitting the given graph format). If None the kernel matrix is calculated upon fit data. The test samples. y : None There is no need of a target in a transformer, yet the pipeline API requires this parameter. Returns ------- K : numpy array, shape = [n_targets, n_input_graphs] corresponding to the kernel matrix, a calculation between all pairs of graphs between target an features """ self._method_calling = 2 self.fit(X) # Transform - calculate kernel matrix # Output is always normalized return self._calculate_kernel_matrix()
[docs] def transform(self, X): """Calculate the kernel matrix, between given and fitted dataset. Parameters ---------- X : iterable Each element must be an iterable with at most three features and at least one. The first that is obligatory is a valid graph structure (adjacency matrix or edge_dictionary) while the second is node_labels and the third edge_labels (that fitting the given graph format). If None the kernel matrix is calculated upon fit data. The test samples. Returns ------- K : numpy array, shape = [n_targets, n_input_graphs] corresponding to the kernel matrix, a calculation between all pairs of graphs between target an features """ self._method_calling = 3 # Check is fit had been called check_is_fitted(self, ['X']) # Input validation and parsing if X is None: raise ValueError('`transform` input cannot be None') else: if not isinstance(X, collections.Iterable): raise TypeError('input must be an iterable\n') i = 0 out = list() for (idx, x) in enumerate(iter(X)): is_iter = isinstance(x, collections.Iterable) if is_iter: x = list(x) if is_iter and len(x) in [0, 1, 2, 3]: if len(x) == 0: warnings.warn('Ignoring empty element on index: ' + str(idx)) continue elif len(x) == 1: warnings.warn( 'Ignoring empty element on index: ' + str(i) + '\nLabels must be provided.') else: x = Graph(x[0], x[1], {}, self._graph_format) vertices = list(x.get_vertices(purpose="any")) Labels = x.get_labels(purpose="any") elif type(x) is Graph: vertices = list(x.get_vertices(purpose="any")) Labels = x.get_labels(purpose="any") else: raise TypeError('each element of X must be either ' 'a graph object or a list with at ' 'least a graph like object and ' 'node labels dict \n') # Hash based on the labels of fit new_labels = {v: self._labels_hash_dict.get(l, None) for v, l in iteritems(Labels)} # Radix sort the other g = ((vertices, new_labels) + ({n: x.neighbors(n, purpose="any") for n in vertices},)) gr = {0: self.NH_(g)} for r in range(1, self.R): gr[r] = self.NH_(gr[r-1]) # save the output for all levels out.append(gr) i += 1 if i == 0: raise ValueError('parsed input is empty') # Transform - calculate kernel matrix # Output is always normalized km = self._calculate_kernel_matrix(out) self._is_transformed = True return km
[docs] def pairwise_operation(self, x, y): """Calculate a pairwise kernel between two elements. Parameters ---------- x, y : list Dict of len=2, tuples, consisting of vertices sorted by (labels, vertices) and edge-labels dict, for all levels from 0 up to self.R-1. Returns ------- kernel : number The kernel value. """ k = sum(nh_compare_labels(x[r], y[r]) for r in range(self.R)) return k / (1.0*self.R)
[docs] def diagonal(self): """Calculate the kernel matrix diagonal of the fit/transfromed data. Parameters ---------- None. Returns ------- X_diag : np.array The diagonal of the kernel matrix between the fitted data. This consists of each element calculated with itself. Y_diag : np.array The diagonal of the kernel matrix, of the transform. This consists of each element calculated with itself. """ # Output is always normalized if self._is_transformed: return 1.0, 1.0 else: return 1.0
[docs] def ROT(self, n, d): """`rot` operation for binary numbers. Parameters ---------- n : int The value which will be rotated. d : int The number of rotations. Returns ------- rot : int The result of a rot operation. """ m = d % self.bits if m > 0: return (n << m) & self._mask | \ ((n & self._mask) >> (self.bits-m)) else: return n
[docs] def neighborhood_hash_simple(self, G): """(simple) neighborhood hashing as defined in :cite:`Hido2009ALG`. Parameters ---------- G : tuple A tuple of three elements consisting of vertices sorted by labels, vertex label dictionary and edge dictionary. Returns ------- vertices_labels_edges : tuple A tuple of vertices, new_labels-dictionary and edges. """ vertices, labels, neighbors = G new_labels = dict() for u in vertices: if (labels[u] is None or any(labels[n] is None for n in neighbors[u])): new_labels[u] = None else: label = self.ROT(labels[u], 1) for n in neighbors[u]: label ^= labels[n] new_labels[u] = label return tuple(self._vertex_sort(vertices, new_labels)) + (neighbors,)
[docs] def neighborhood_hash_count_sensitive(self, G): """Count sensitive neighborhood hash as defined in :cite:`Hido2009ALG`. Parameters ---------- G : tuple, len=3 A tuple three elements consisting of vertices sorted by labels, vertex label dict, edge dict and number of occurencies dict for labels. Returns ------- vertices_labels_edges_noc : tuple A tuple of vertices, new_labels-dictionary and edges. """ vertices, labels, neighbors = G new_labels = dict() for u in vertices: if (labels[u] is None or any(labels[n] is None for n in neighbors[u])): new_labels[u] = None else: label = self.ROT(labels[u], 1) label ^= self.radix_sort_rot([labels[n] for n in neighbors[u]]) new_labels[u] = label return tuple(self._vertex_sort(vertices, new_labels)) + (neighbors,)
[docs] def radix_sort_rot(self, labels): """Sorts vertices based on labels. Parameters ---------- labels : dict Dictionary of labels for vertices. Returns ------- labels_counts : list A list of labels with their counts (sorted). """ n = len(labels) result = 0 if n == 0: return result for b in range(self.bits): # The output array elements that will have sorted arr output = [0]*n # initialize count array as 0 count = [0, 0] # Store count of occurrences in count[] for i in range(n): count[(labels[i] >> b) % 2] += 1 # Change count[i] so that count[i] now contains actual # position of this digit in output array count[1] += count[0] # Build the output array for i in range(n-1, -1, -1): index = (labels[i] >> b) output[count[index % 2] - 1] = labels[i] count[index % 2] -= 1 # Copying the output array to arr[], # so that arr now contains sorted numbers labels = output previous, occ = labels[0], 1 for i in range(1, len(labels)): label = labels[i] if label == previous: occ += 1 else: result ^= self.ROT(previous ^ occ, occ) occ = 1 previous = label if occ > 0: result ^= self.ROT(previous ^ occ, occ) return result
def _vertex_sort(self, vertices, labels): """Sorts vertices based on labels. Parameters ---------- vertices : listable A listable of vertices. labels : dict Dictionary of labels for vertices. Returns ------- vertices_labels : tuple, len=2 The sorted vertices based on labels and labels for vertices. """ if self._method_calling == 3: return (sorted(list(vertices), key=lambda x: float('inf') if labels[x] is None else labels[x]), labels) else: return (sorted(vertices, key=lambda x: labels[x]), labels)
def nh_compare_labels(Gx, Gy): """Compare labels function as defined in :cite:`Hido2009ALG`. Parameters ---------- G_{x,y} : tuple, len=2 Graph tuples of two elements, consisting of vertices sorted by (labels, vertices) and edge-labels dict. Returns ------- kernel : Number The kernel value. """ # get vertices vx, vy = Gx[0], Gy[0] # get size of vertices nv_x, nv_y = len(Gx[0]), len(Gy[0]) # get labels for vertices Lx, Ly = Gx[1], Gy[1] c, a, b = 0, 0, 0 while a < nv_x and b < nv_y: la = Lx[vx[a]] lb = Ly[vy[b]] if la is None: break if la == lb: c += 1 a += 1 b += 1 elif la < lb: a += 1 else: b += 1 return c/float(nv_x+nv_y-c)