"""A file containing useful external functions"""
# Author: Ioannis Siglidis <y.siglidis@gmail.com>
# License: BSD 3 clause
import os
import numpy as np
from collections import defaultdict
from collections import Iterable
from sklearn.base import TransformerMixin
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import KFold
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import ShuffleSplit
from sklearn.base import BaseEstimator
from sklearn.svm import SVC
from sklearn.utils import Bunch
from sklearn.utils import check_random_state
from sklearn.utils.validation import check_is_fitted
from grakel import Graph
from grakel.graph import is_adjacency as valid_matrix
[docs]def cross_validate_Kfold_SVM(K, y,
n_iter=10, n_splits=10, C_grid=None,
random_state=None, scoring="accuracy", fold_reduce=None):
"""Cross Validate a list of precomputed kernels with an SVM.
Parameters
----------
K : list
A list that must contain either numpy arrays or iterables of numpy arrays.
y : list
List of lists that for every element of K contains numbers of score for all iterations.
n_iter : int
Number of iteration for the K-Fold.
n_splits : int
Number of splits for the K-Fold.
random_state : RandomState or int, default=None
A random number generator instance or an int to initialize a RandomState as a seed.
fold_reduce : callable or None
A function that summarizes information between all folds.
Input must be a list of n_splits elements corresponding to scoring.
If None default is np.mean.
scoring : string, callable, list/tuple, dict or None, default: None
As in ``scoring`` in :xref:`lgscv`.
Returns
-------
out : list
A list that contains a list of the reduced folds for each iteration, for each primary
element of K.
"""
# Initialise C_grid
if C_grid is None:
C_grid = ((10. ** np.arange(-7, 7, 2)) / len(y)).tolist()
elif type(C_grid) is np.array:
C_grid = np.squeeze(C_grid)
if len(C_grid.shape) != 1:
raise ValueError('C_grid should either be None or a squeezable to 1 dimension np.array')
else:
C_grid = list(C_grid)
# Initialise fold_reduce:
if fold_reduce is None:
fold_reduce = np.mean
elif not isinstance(callable, fold_reduce):
raise ValueError('fold_reduce should be a callable')
# Initialise and check random state
random_state = check_random_state(random_state)
# Initialise sklearn pipeline objects
kfolder = KFold(n_splits=n_splits, random_state=random_state, shuffle=True)
estimator = make_pipeline(KMTransformer(), SVC(kernel='precomputed'))
# Make all the requested folds
nfolds = tuple(tuple(kfolder.split(y)) for _ in range(n_iter))
out = list()
for ks in K:
mid = list()
if valid_matrix(ks):
pg = {"svc__C": C_grid, "kmtransformer__K": [Bunch(mat=ks)]}
elif isinstance(ks, Iterable) and all(valid_matrix(k) for k in ks):
pg = [{"svc__C": C_grid, "kmtransformer__K": [Bunch(mat=k)]} for k in ks]
else:
raise ValueError('Not a valid object for kernel matrix/ces')
for kfolds in nfolds:
fold_info = list()
for train, test in kfolds:
gs = GridSearchCV(estimator, param_grid=pg, scoring=scoring,
cv=ShuffleSplit(n_splits=1,
test_size=0.1,
random_state=random_state)).fit(train, y[train])
fold_info.append(gs.score(test, y[test]))
mid.append(fold_reduce(fold_info))
out.append(mid)
return out
[docs]def graph_from_networkx(X, node_labels_tag=None, edge_labels_tag=None, edge_weight_tag=None,
as_Graph=False):
"""Transform an iterable of networkx objects to an iterable of Graphs.
A function for helping a user that has a collection of graphs in networkx to use grakel.
Parameters
----------
node_labels_tag : str or None
Define where to search for labels of nodes, inside the `node` attribute of each graph.
If None no labels are assigned.
edge_labels_tag : tuple
Define where to search for labels of edges, inside the `edge` attribute of each graph.
If None no labels are assigned.
edge_weight_tag : int
Define where to search for weights inside the `edge` attribute of each graph.
If None 1.0 weights are assigned.
as_Graph : bool, default=False
Return each output as a grakel.Graph object.
Returns
-------
grakel_graphs : generator
Returns a generator containing the same collections to be given as an input for
any grakel kernel.
"""
import networkx as nx
v2 = False
if nx.__version__ > '2.':
v2 = True
if node_labels_tag is None:
def nodel_init():
return None
def nodel_put(*args):
pass
elif type(node_labels_tag) is str:
def nodel_init():
return dict()
def nodel_put(nl, u, d):
nl[u] = d[u][node_labels_tag]
else:
raise ValueError('node_labels_tag must be a str indicating the '
'tag of the labels inside nodes or None')
if edge_labels_tag is None:
def edgel_init():
return None
def edgel_put(*args):
pass
elif type(edge_labels_tag) is str:
def edgel_init():
return dict()
if v2:
def edgel_put(el, u, d):
el[u] = d[u][edge_labels_tag]
else:
def edgel_put(el, u, d):
el[u] = d[u[0]][u[1]][edge_labels_tag]
else:
raise ValueError('edge_labels_tag must be a str indicating the '
'tag of the labels inside edges or None')
if edge_weight_tag is None:
def get_weight(*args):
return 1.0
elif type(edge_weight_tag) is str:
if v2:
def get_weight(d, e):
return d[e][edge_weight_tag]
else:
def get_weight(d, e):
return d[e[0]][e[1]][edge_weight_tag]
else:
raise ValueError('weight_labels_tag must be a str indicating '
'tag of the labels inside edges or None (1.0)')
if not isinstance(X, Iterable):
raise ValueError('X must be an iterable')
if v2:
def take_ne(graph):
return graph.nodes, graph.edges
else:
def take_ne(graph):
return graph.node, graph.edge
for G in X:
graph_object = dict()
nl = nodel_init()
el = edgel_init()
nodes, edges = take_ne(G)
for u in G.nodes():
graph_object[u] = dict()
nodel_put(nl, u, nodes)
for v in G.neighbors(u):
graph_object[u][v] = get_weight(edges, (u, v))
edgel_put(el, (u, v), edges)
if as_Graph:
yield Graph(graph_object, nl, el)
else:
yield [graph_object, nl, el]
[docs]def graph_from_pandas(edge_df, node_df=None, directed=False, as_Graph=False):
"""Produces a collection of Graph Objects from pandas dataframes.
A function for helping a user that has a bunch of graph csv
files make a collection for grakel input.
Parameters
----------
edge_df : tuple(pandas.DataFrame, pct, tuple(pct, pct), pct or None, pct or list(pct) or None)
A tuple of many elements:
1. A pandas dataframe containing all the edges of a collection of graphs
2. The column name of the graph index
3. A tuple containing source and destination column names
4. The column name of weights column (None if non-existent)
5. The column name pointing the column containing edge-labels or a list pointing
the columns of attributes (None if edge-labels are non-existent).
If node_df exists, correspondance of graph indexes and node indexes must be exact.
node_df : tuple(pandas.DataFrame, pct, pct or list(pct) or None) or None, default=None
A tuple of many elements:
1. A pandas dataframe containing all the nodes of a collection of graphs
2. The column name of the graph index
3. The column name pointing the column containing node-labels or a list pointing
the columns of attributes (None if node-labels are non-existent).
Node id must correspond to node number.
directed : bool, default=False
A variable indicating with True if both directions of the graph have been inserted inside
the dataframe. Otherwise the graph is considered as undirected.
as_Graph : bool, default=False
Return each output as a grakel.Graph object.
Returns
-------
grakel_graphs : dict
Dictionary with keys te keys of the graphs inside the pandas dataframe and values the
corresponding graph object
"""
from pandas import DataFrame
if node_df is None:
graphs = dict()
throw_error = False
elif (type(node_df) is tuple and
len(node_df) == 3 and
type(node_df[0]) is DataFrame and
(node_df[1] is None or node_df[1] in node_df[0].columns) and
(node_df[2] is None or
(type(node_df[2]) is not list and node_df[2] in node_df[0].columns) or
(type(node_df[2]) is list and all(i in node_df[0].columns for i in node_df[2])))):
df, gtag, labs = node_df
graphs = defaultdict(lambda: defaultdict(dict))
if labs is None:
for index, row in df.iterrows():
gidx = row[gtag]
graphs[gidx]["graph"][index] = dict()
graphs[gidx]["node_label"] = None
elif type(labs) is list:
for index, row in df.iterrows():
gidx = row[gtag]
graphs[gidx]["graph"][index] = dict()
graphs[gidx]["node_label"][index] = np.array(row[c] for c in labs)
else:
for index, row in df.iterrows():
gidx = row[gtag]
graphs[gidx]["graph"][index] = dict()
graphs[gidx]["node_label"][index] = row[labs]
throw_error = True
else:
raise ValueError('node_df must be a tuple containing a pandas.DataFrame object '
'a column name corresponding to the valid column index for graphs indexes '
'and a None, column name or list of column names corresponding to no-labels, '
'labels or attributes.')
if (type(edge_df) is tuple and
len(edge_df) == 5 and
type(edge_df[0]) is DataFrame and
(edge_df[1] in edge_df[0].columns) and
(type(edge_df[2]) is tuple and len(edge_df[2]) == 2 and
all(c in edge_df[0].columns for c in edge_df[2])) and
(edge_df[3] is None or edge_df[3] in edge_df[0].columns) and
(edge_df[4] is None or
(type(edge_df[4]) is not list and edge_df[4] in edge_df[0].columns) or
(type(edge_df[4]) is list and all(c in edge_df[0].columns for c in edge_df[4])))):
df, gtag, (src_c, dst_c), w_c, labs = edge_df
if w_c is not None:
def get_weight(row):
return row[w_c]
else:
def get_weight(row):
return 1.
if labs is None:
def set_label(*args):
pass
elif type(labs) is list:
def set_label(row, d, e):
if d["edge_label"] is None:
d["edge_label"] = dict()
el = np.array([row[l] for l in labs])
d["edge_label"][e] = el
if not directed:
d["edge_label"][(e[1], e[0])] = el
else:
def set_label(row, d, e):
if d["edge_label"] is None:
d["edge_label"] = dict()
el = row[labs]
d["edge_label"][e] = el
if not directed:
d["edge_label"][(e[1], e[0])] = el
for index, row in df.iterrows():
src, dst = row[src_c], row[dst_c]
gidx = row[gtag]
if gidx not in graphs:
if throw_error:
raise ValueError('This graph didn\'t appear on node labels dataframe')
else:
graphs[gidx] = dict()
graphs[gidx]["graph"] = dict()
if "node_label" not in graphs[gidx]:
graphs[gidx]["node_label"] = None
graphs[gidx]["edge_label"] = None
if src not in graphs[gidx]["graph"]:
if throw_error:
raise ValueError('This node didn\'t appear on node labels dataframe for graph '
'with id ' + str(gidx))
else:
graphs[gidx]["graph"][src] = dict()
graphs[gidx]["graph"][src][dst] = get_weight(row)
if not directed:
if dst not in graphs[gidx]["graph"]:
if throw_error:
raise ValueError('This node didn\'t appear on node labels dataframe '
'for graph with id ' + str(gidx))
else:
graphs[gidx]["graph"][dst] = dict()
graphs[gidx]["graph"][dst][src] = get_weight(row)
set_label(row, graphs[gidx], (src, dst))
else:
raise ValueError('edge_df must be a tuple containing a pandas.DataFrame object '
'a column name corresponding to a valid column index of the dataframe '
'that contain the index of the graph that an edge belongs inside the '
'dataframe, a column name corresponding to weights inside the dataframe'
'(or None if weights do not exist) and a list of column names '
'corresponding or column name corresponding to labels or None '
'if none of this exists')
return {k: Graph(graphs[k]["graph"], graphs[k]["node_label"], graphs[k]["edge_label"])
if as_Graph else [graphs[k]["graph"], graphs[k]["node_label"], graphs[k]["edge_label"]]
for k in graphs.keys()}
[docs]def graph_from_csv(edge_files, node_files=None, index_type=str,
directed=False, sep=",", as_Graph=False):
"""Produces a collection of Graph Objects from a collection of csv files.
A function for helping a user that has a bunch of graph csv
files make a collection for grakel input.
Parameters
----------
edge_files : tuple(iter(str), bool, bool or None)
edge_files = (iter(<edge_file_address>), <weight_flag>, <attributes_flag>).
Each line of all the edge files must have the following structure:
.. code::
<v_i><sep><v_j>[weight][labels/attr]
weights: "<sep><weight>", <weight_flag> is True
"", <weight_flag> is False
labels/attr: "<sep><label>", <attributes_flag> is False
"<sep><a_1>, .. , <sep><a_n>", <attributes_flag> is True
"", <attributes_flag> is None
node_files : tuple(iter(str), bool) or None, default=None
node_files = (iter(<node_file_address>), index_type, <attributes_flag>).
Each line of the node file must have the following structure (if exists):
.. code::
<v_i><sep>[labels/attr]
labels/attr: "<sep><label>", <attributes_flag> is False
"<sep><a_1>, .. , <sep><a_n>", <attributes_flag> is True
"", <attributes_flag> is None
directed : bool, default=False
Defines if the graph should be considered as directed.
sep : str, default=","
The separator for the csv files.
as_Graph : bool, default=False
Return each output as a grakel.Graph object.
Returns
-------
grakel_graphs : generator
Returns a generator containing the same collections to be given as an input for
any grakel kernel.
"""
def edge_files_error():
raise ValueError('edge_file argument must contain an iterable of strings of edge files, '
'a bool weight_flag and attributes_flag bool or None')
def node_files_error():
raise ValueError('node_files argument can be None or contain an iterable of strings'
'of edge files and attributes_flag bool or None')
if type(index_type) is not type:
raise ValueError('index_type must be a class `type` object')
if (type(edge_files) is not tuple or len(edge_files) != 3 or
type(edge_files[1]) is not bool or
type(edge_files[2]) not in [bool, None]):
edge_files_error()
else:
if edge_files[1]:
def get_weight(l):
return float(l.pop(0))
else:
def get_weight(l):
return 1.0
efs = list()
for e in edge_files[0]:
if type(e) is not str:
edge_files_error()
if not os.path.isfile(e):
raise ValueError('Each edge file address must be a valid address')
efs.append(e)
if type(node_files) is None:
nfs = None
elif (type(node_files) is tuple and len(node_files) == 2 and
type(node_files[1]) in [bool, None]):
nfs = list()
for n in node_files[0]:
if type(n) is not str:
edge_files_error()
if not os.path.isfile(n):
raise ValueError('Each edge file address must be a valid address')
nfs.append(n)
else:
node_files_error()
if edge_files[2] is None:
def get_el(ef, g):
el = None
for line in ef:
q = line.split(sep)
m = q[2:]
ea, eb = [index_type(e) for e in q[:2]]
g[ea][eb] = get_weight(m)
if not directed:
g[eb][ea] = g[ea][eb]
return el
elif edge_files[2]:
def get_el(ef, g):
el = dict()
for line in ef:
q = line.strip('\n').split(sep)
ea, eb = [index_type(e) for e in q[:2]]
m = q[2:]
g[ea][eb] = get_weight(m)
el[(ea, eb)] = np.array([float(num) for num in m])
if not directed:
el[(eb, ea)] = el[(ea, eb)]
g[eb][ea] = g[ea][eb]
return el
else:
def get_el(ef, g):
el = dict()
for line in ef:
q = line.strip('\n').split(sep)
ea, eb = [index_type(e) for e in q[:2]]
m = q[2:]
g[ea][eb] = get_weight(m)
el[(ea, eb)] = str(m[0])
if not directed:
el[(eb, ea)] = el[(ea, eb)]
g[eb][ea] = g[ea][eb]
return el
if nfs is None:
for efa in efs:
nl = None
graph_object = defaultdict(dict)
with open(efa, 'r') as ef:
el = get_el(ef, graph_object)
graph_object = dict(graph_object)
if as_Graph:
yield Graph(graph_object, nl, el)
else:
yield [graph_object, nl, el]
else:
if len(nfs) != len(efs):
raise ValueError('The number of edge files and the number of node files must be the same')
for (nfa, efa) in zip(nfs, efs):
graph_object = dict()
with open(nfa, 'r') as nf:
if node_files[1] is None:
nl = None
for line in nf:
graph_object[index_type(line.strip('\n').split(sep)[0].strip())] = dict()
elif node_files[1]:
nl = dict()
for line in nf:
q = line.strip('\n').split(sep)
graph_object[index_type(q[0].strip())] = dict()
nl[index_type(q[0].strip())] = np.array([float(i.strip()) for i in q[1:]])
else:
nl = dict()
for line in nf:
q = line.strip('\n').split(sep)
graph_object[index_type(q[0].strip())] = dict()
nl[index_type(q[0].strip())] = q[1]
with open(efa, 'r') as ef:
el = get_el(ef, graph_object)
if as_Graph:
yield Graph(graph_object, nl, el)
else:
yield [graph_object, nl, el]