"""A python file that implements a class and functions for graphs."""
from __future__ import absolute_import
import collections
import numbers
import warnings
import numpy as np
from scipy.sparse import isspmatrix
from scipy.sparse.csgraph import laplacian
from .tools import inv_dict
from .tools import nested_dict_add
from .tools import priority_dict
# Python 2/3 cross-compatibility import
from six import iteritems
from six import itervalues
from builtins import range
[docs]class Graph(object):
"""The general `graph` class.
A general graph class that supports adjacency, dictionary formats
while beeing memory/computationaly sustainable.
Parameters
----------
initialization_object : dict, list or array-like, square, default=None
The initialisation object for the graph (*valid-graph-format*).
- If given a dictionary the input can be as follows:
+ 2-level nested dictionaries from edge symbols to weights
+ Dictionary of symbols to list of symbols (unweighted)
+ Dictionary of tuples to weights (weighted)
+ Iterable of tuples of len 2 (unweighted)
+ Iterable of tuples of len 3 (vertex, vertex, weight)
- If given an array the input can be as follows:
+ array-like lists of lists
+ np.array
+ sparse matrix (scipy.sparse)
node_labels : dict, default=None
A label dictionary corresponding to all vertices of the graph:
+ for adjacency matrix labels should be given to numbers starting
from 0 and ending in N-1, where the matrix has size N by N
+ for dictionary labels should correspond to all keys
edge_labels : dict, default=None
A labels dictionary corresponding to all edges of the graph
with keys: index/symbol-tuples and value: labels.
graph_format : str, valid_values={"dictionary", "adjacency", "all", "auto"}, default=None
Defines the internal representation of the graph object which can be
a dictionary as a matrix, or both:
+ for dictionary: "dictionary"
+ for adjacency_matrix: "adjacency"
+ for both: "all"
+ for the current_format (if existent): "auto"
Attributes
----------
n : int
The one dimension of the (square) adjacency matrix.
Signifies the number of vertices.
adjacency_matrix : np.array
The adjacency_matrix corresponding to the graph.
index_node_labels : dict
Label dictionary for indeces, of adjacency matrix.
Keys are valid numbers from 0 to n-1.
index_edge_labels : dict
Label dictionary for edges, of adjacency matrix.
Keys are tuple pairs of symbols with valid numbers from 0 to n-1,
that have a positive adjacency matrix value.
vertices : set
The set of vertices corresponding to the edge_dictionary
representation.
edge_dictionary : dict
A 2-level nested dictionary from edge symbols to weights.
node_labels : dict
Label dictionary for nodes.
Keys are vertex symbols inside vertices.
edge_labels : dict
Label dictionary for edges.
Keys are tuple pairs of symbols inside vertices and edges.
edsamic : dict
*Edge-Dictionary-Symbols-Adjacency-Matrix-Index-Correspondance*.
A dictionary which translates dictionary symbols to adjacency
matrix indexes, when storing both formats.
shortest_path_mat : np.array, square
Holds the shortest path matrix.
label_group : dict
A 2-level nested dict that after the first level of a pair tuple for
purpose: "adjacency", "dictionary" and "vertex","edge" specification
holds the inverse map of labels.
laplacian_graph : np.array
Holds the graph laplacian.
_format : str, valid_values={"adjacency", "dictionary", "all"}
Private attribute that keeps the current format.
"""
# Adjacency Preset
n = -1
adjacency_matrix = np.empty(0)
index_node_labels = dict()
index_edge_labels = dict()
# Dictionary Preset
vertices = set()
edge_dictionary = dict()
node_labels = dict()
edge_labels = dict()
edsamic = dict()
# Other Preset
shortest_path_mat = None
label_group = dict()
laplacian_graph = None
[docs] def __init__(
self,
initialization_object=None,
node_labels=None, edge_labels=None,
graph_format="auto", construct_labels=False):
"""__init__ function of the graph object."""
self.construct_label = construct_labels
if graph_format in ["adjacency", "dictionary", "auto", "all"]:
self._format = graph_format
if (initialization_object is not None):
self.build_graph(initialization_object,
node_labels,
edge_labels)
elif graph_format == "auto":
raise ValueError('no initialization object ' +
'- format must not be auto')
else:
raise ValueError('Invalid graph format.\nValid graph formats' +
' are "all", "dictionary", "adjacency", "auto"')
def build_graph(self, g, node_labels=None, edge_labels=None):
"""Build a graph structure, given a supported graph representation.
Parameters
----------
g : a valid graph format
Similar to intialization object (of ``__init__``).
node_labels: dict, default=None
Node labels dictionary relevant to g format.
edge_labels: dict, default=None
Edge labels dictionary relevant to g format.
Returns
-------
None.
"""
self.shortest_path_mat = None
self.label_group = None
case = 0
if g is not None:
if is_adjacency(g):
# Input is considered an adjacency matrix
case = 1
# Assign labels for nodes
self.index_node_labels = node_labels
# Assign labels for edges
self.index_edge_labels = edge_labels
if(self._format == "auto"):
self._format = "adjacency"
elif is_edge_dictionary(g):
# Input is considered as an edge dictionary
case = 2
# Assign labels for nodes
self.node_labels = node_labels
# Assign labels for edges
self.edge_labels = edge_labels
if(self._format == "auto"):
self._format = "dictionary"
else:
raise ValueError(
'Unsupported input type. For more information ' +
'check the documentation, concerning valid input ' +
'types for graph type object.')
# If graph is of one type prune the other
if self._format == "adjacency":
self.edge_dictionary = None
elif self._format == "dictionary":
self.adjacency_matrix = None
# Import the given input properly
if (case == 1):
self._import_adjacency(g)
elif (case == 2):
self._import_dictionary(g)
def change_format(self, graph_format):
"""Change the format of the graph from an existing to an other.
Parameters
----------
graph_format : str, valid_values={"dictionary", "adjacency", "all"}
Defines the internal representation of the graph object which can
be a dictionary as a matrix, or both:
+ for dictionary: "dictionary"
+ for adjacency_matrix: "adjacency"
+ for both: "all"
Returns
-------
None.
"""
if graph_format not in ["all", "dictionary", "adjacency"]:
raise ValueError(
'Invalid graph format for function change format' +
'. Valid formats are "all", "dictionary", "adjacency"')
else:
if (graph_format is not self._format):
past_format = self._format
self._format = graph_format
if past_format == "adjacency":
self._import_adjacency()
elif past_format == "dictionary":
self._import_dictionary()
else:
if self._format == "dictionary":
self.n = -1
self.adjacency_matrix = None
self.edsamic = None
self.index_node_labels = None
self.index_edge_labels = None
else:
self.vertices = None
self.node_labels = None
self.edge_labels = None
self.edge_dictionary = None
def desired_format(self, graph_format, warn=False):
"""Change the graph format, to include the desired.
Parameters
----------
graph_format : str, valid_values={"dictionary", "adjacency", "all"}
Defines the internal representation of the graph object which
can be a dictionary as a matrix, or both:
+ for dictionary: "dictionary"
+ for adjacency_matrix: "adjacency"
+ for both: "all"
warn : bool, default=False
Warn the user if the format of the graph is being changed.
Returns
-------
None.
"""
if graph_format == "all":
self.change_format(graph_format)
elif graph_format == "dictionary":
if self._format not in ["all", "dictionary"]:
if warn:
warnings.warn('changing format from "dictionary" to "all"')
self.change_format("all")
elif graph_format == "adjacency":
if self._format not in ["all", "adjacency"]:
warnings.warn('changing format from "adjacency" to "all"')
self.change_format("all")
def construct_labels(self, label_type="vertex", purpose="adjacency"):
"""Construct labels (if user does not provide).
Parameters
----------
label_type : str, valid_values={"vertex", "edge"}, default="vertex"
What kind of labels are going to be constructed
purpose : str, valid_values={"adjacency", "dictionary"}, default="adjacency"
Defines if the labels correspond to dictionary or adjacency matrix
Returns
-------
None.
"""
if (purpose == "adjacency"):
nodes = list(range(0, self.n))
if (label_type == "vertex"):
self.index_node_labels = dict(zip(nodes, nodes))
elif (label_type == "edge"):
self.index_edge_labels = {(i, j): self.adjacency_matrix[i, j]
for i in nodes for j in nodes}
else:
warnings.warn('unsupported label type')
elif (purpose == "dictionary"):
nodes = sorted(list(self.vertices))
if (label_type == "vertex"):
self.node_labels = dict(zip(nodes, nodes))
elif (label_type == "edge"):
self.index_edge_labels = {
(i, j): (0 if (i not in self.edge_dictionary) or
(j not in self.edge_dictionary[i]) else
self.edge_dictionary[i][j])
for i in nodes for j in nodes}
else:
warnings.warn('unsupported label type')
def convert_labels(
self,
target_format="dictionary",
purpose="all",
init=False):
"""Convert labels to a desired format.
Parameters
----------
target_format : str, valid_values={"adjacency", "dictionary"}, default="dictionary"
Defines what the target format for conversion will be.
purpose : str, valid_values={"adjacency", "dictionary", "all"}, default="all"
Defines if the labels will be converted for dictionary, adjacency
matrix or both.
init : bool, default=False
An override parameter for format checks,
usefull for initialisation.
Returns
-------
None.
"""
if (target_format == "adjacency"):
if (self._format != "adjacency" or init):
if bool(self.index_node_labels) and \
purpose in ['all', 'vertex']:
warnings.warn('overriding existing' +
'node labels for indexes')
if bool(self.index_edge_labels) and \
purpose in ['all', 'edges']:
warnings.warn('overriding existing edge' +
'labels for indexes')
cond_labels_nodes = \
purpose in ['all', 'vertex'] and bool(self.node_labels)
cond_labels_edges = \
purpose in ['all', 'edges'] and bool(self.edge_labels)
if cond_labels_nodes or cond_labels_edges:
lov_sorted = sorted(list(self.vertices))
if cond_labels_nodes:
self.index_node_labels = \
{i: self.node_labels[k] for (i, k) in
enumerate(lov_sorted)}
if cond_labels_edges:
vi = {v: i for (i, v) in enumerate(lov_sorted)}
self.index_edge_labels = \
{(vi[k], vi[q]): self.edge_labels[(k, q)]
for (k, q) in self.edge_labels.keys()}
else:
if not init:
warnings.warn('no labels to convert from, ' +
'for the given purpose')
else:
warnings.warn('labels already defined for that format ' +
'- nothing to convert')
elif (target_format == "dictionary"):
if (self._format != "dictionary" or init):
if bool(self.node_labels) and purpose in ['all', 'vertex']:
warnings.warn('overriding existing node labels, ' +
'for dictionary symbols')
if bool(self.edge_labels) and purpose in ['all', 'edges']:
warnings.warn('overriding existing edge labels, ' +
'for dictionary symbols')
cond_labels_nodes = \
purpose in ['all', 'vertex'] \
and bool(self.index_node_labels)
cond_labels_edges = \
purpose in ['all', 'edges'] \
and bool(self.index_edge_labels)
if cond_labels_nodes or cond_labels_edges:
if cond_labels_nodes:
self.node_labels = self.index_node_labels
if cond_labels_nodes:
self.edge_labels = self.index_edge_labels
else:
if not init:
warnings.warn('no labels to convert from, ' +
'for the given purpose')
else:
warnings.warn('labels already defined ' +
'for the given format')
def label(self, obj, label_type="vertex", purpose="dictionary"):
"""Get the label of a vertex.
Parameters
----------
obj : hashable
The candidate labeled object for the corresponding purpose
and label type.
label_type : str, valid_values={"vertex", "edge"}, default="vertex"
Defines if the lookup of the label
will be done for vertices or edges.
purpose : str, valid_values={"dictionary", "adjacency", "any"}, default="dictionary"
Defines if the lookup will be done on the existing
("any" - if "all" default is adjacency) to the "dictionary"
or to the "adjacency" format of the graph.
Returns
-------
label : str, list, ..., *valid-label-type*
Returns the label of the current object on the defined lookup.
"""
if purpose == "any":
if self._format == "all":
purpose = "adjacency"
else:
purpose = self._format
if purpose not in ["dictionary", "adjacency"]:
raise ValueError('unrecognized purpose')
if (label_type == "vertex"):
vertex = obj
labels = self.get_labels(purpose=purpose, label_type=label_type)
if labels is not None:
if (vertex in labels):
return labels[vertex]
else:
raise ValueError('no label assigned to this vertex symbol')
else:
warnings.warn('labels are not set - default value returned')
return vertex
elif (label_type == "edge"):
edge = obj
labels = self.get_labels(purpose=purpose, label_type=label_type)
if not bool(labels):
if (edge in labels):
return labels[edge]
else:
raise ValueError('no label assigned to this edge symbol')
else:
warnings.warn('labels are not set - default value returned')
if (edge[0] not in self.edge_dictionary) or \
(edge[1] not in self.edge_dictionary[edge[0]]):
return .0
else:
return self.edge_dictionary[edge[0]][edge[1]]
def relabel(self, new_labels, purpose="dictionary", label_type="vertex"):
"""Relabel the graph object, supporting the current format.
Parameters
----------
new_labels : dict
The new labels corresponding to the label type and purpose.
purpose : str, valid_values={"dictionary", "adjacency"}, default="dictionary"
Defines if the new labels are given for
"adjacency" or "dictionary".
label_type : str, valid_values={"vertex", "edge"}, default="vertex"
Defines if the new labels are for vertices or edges.
Returns
-------
None.
"""
# checks
if purpose not in ["dictionary", "adjacency"]:
raise ValueError('purpose can either be dictionary or adjacency')
if label_type not in ["vertex", "edge"]:
raise ValueError('label_type can either be vertex or edge')
if new_labels is None or not bool(new_labels):
warnings.warn('user must provide new labels, input is None')
elif label_type == "vertex":
if purpose == "dictionary":
if self._format in ["dictionary", "all"]:
self.node_labels = new_labels
else:
raise ValueError('graph is in a format that supports ' +
'labels for dictionary symbols')
# Transform to a valid format
if self._format == "all":
self.convert_labels("adjacency", "vertex")
if bool(self.label_group) and \
((label_type, purpose) in self.label_group):
self.label_group[(label_type, purpose)] = dict()
elif purpose == "adjacency":
if self._format in ["all", "adjacency"]:
self.index_node_labels = new_labels
else:
raise ValueError('graph is in a format that supports ' +
'labels for indexes')
if self._format == "all":
self.convert_labels("dictionary", "vertex")
if bool(self.label_group) and \
(label_type, purpose) in self.label_group:
self.label_group[(label_type, purpose)] = dict()
else:
warnings.warn('no labels to convert from, ' +
'for the given purpose')
elif label_type == "edge":
if purpose == "dictionary":
if self._format in ["dictionary", "all"]:
self.edge_labels = new_labels
else:
raise ValueError('graph is in a format that ' +
'supports labels for dictionary symbols')
# Transform to a valid format
if self._format == "all":
self.convert_labels("adjacency", "edge")
if (label_type, purpose) in self.label_group:
self.label_group[(label_type, purpose)] = dict()
elif purpose == "adjacency":
if self._format in ["all", "adjacency"]:
self.index_edge_labels = new_labels
else:
raise ValueError('graph is in a format that supports ' +
'labels for indexes')
if self._format == "all":
self.convert_labels("dictionary", "edge")
if (label_type, purpose) in self.label_group:
self.label_group[(label_type, purpose)] = dict()
else:
warnings.warn('no labels to convert from, for ' +
'the given purpose')
else:
warnings.warn('unrecognized label type')
def build_shortest_path_matrix(
self,
algorithm_type="auto",
clean=False,
labels="vertex"):
"""Build and return the shortest path matrix between all nodes.
Parameters
----------
algorithm_type : str, valid_values={"auto", "adjacency", "dictionary"}, default="auto"
Defines which shortest-path algorithm will be used for building the
shortest path matrix:
+ "dijkstra" : choses the dijkstra algorithm (Matrix
computation complexity: :math:`O(|V|(|E|+|V|)log(|V|))`
+ "floyd_warshall" : choses the floyd-warshall algorithm
(Matrix computation complexity: :math:`O(|V|^3)`
+ "auto" : choses the best possible algorithm for the current
format
clean : bool, default=False
Construct the shortest path matrix from scratch or output existing
if exists
labels : str, valid_values={"vertex", "edge", "all", "none"}, default="vertex"
Returns labels corresponding for the indexes of the shortest path
matrix for vertices, for edge (only for the valid ones on the
original graph), for both ("all") and for no labels ("none")
Returns
-------
shortest_path_matrix : np.array, shape=(:math:`|V|`, :math:`|V|`)
The produced shortest path matrix.
vertex_labels : dict
The vertex labels, outputed only if *labels* parameter is either
"vertex" or "all".
edge_labels : dict
The edge labels, outputed only if *labels* parameter is
either "edge" or "all".
"""
if labels not in ['vertex', 'edge', 'all', 'none']:
raise ValueError('only labels for vertices and edges exist')
if clean:
self.shortest_path_mat = None
if self.shortest_path_mat is not None:
if labels == "all":
return self.shortest_path_mat,\
self.get_labels(),\
self.get_labels("edge")
if labels == "edge":
return self.shortest_path_mat, self.get_labels("edge")
if labels == "vertex":
return self.shortest_path_mat, self.get_labels()
else:
return self.shortest_path_mat
# Assign the desired algorithm
if algorithm_type == "auto":
if self._format in ["all", "dictionary"]:
algorithm_type = "dijkstra"
elif self._format == "adjacency":
algorithm_type = "floyd_warshall"
if algorithm_type == "dijkstra":
# all format required
self.desired_format("all", warn=True)
if (bool(self.edsamic)):
indexes = self.edsamic
else:
indexes = dict(zip(range(self.n), range(self.n)))
# calculate shortest path matrix
shortest_path_mat = np.full([self.n, self.n], float("Inf"))
for k in indexes.keys():
dict_fd, _ = dijkstra(self.edge_dictionary, k)
for s in dict_fd.keys():
shortest_path_mat[indexes[k], indexes[s]] = dict_fd[s]
elif algorithm_type == "floyd_warshall":
self.desired_format("adjacency", warn=True)
shortest_path_mat = floyd_warshall(self.adjacency_matrix)
self.shortest_path_mat = shortest_path_mat
if labels == "all":
return shortest_path_mat,\
self.get_labels(),\
self.get_labels("edge")
if labels == "edge":
return shortest_path_mat, self.get_labels("edge")
if labels == "vertex":
return shortest_path_mat, self.get_labels()
else:
return shortest_path_mat
def get_labels(self, label_type="vertex", purpose="adjacency", return_none=False):
"""Get labels corresponding to the purpose.
Parameters
----------
label_type : str, valid_values={"vertex", "edge"}, default="vertex"
Defines if the the labels will correspond to vertices or edges.
purpose : str, valid_values={"dictionary", "adjacency", "any"},
default="dictionary"
Defines if the labels will correspond to "dictionary" (symbols),
to "adjacency" indexes, or to "any" valid format (if "all" the
result is for "adjacency").
return_none : bool
Defines if get_labels can return None in case of label absence, or raise an error.
Returns
-------
labels : dict,
Returns the labels for the given type and purpose.
"""
if purpose == "adjacency":
case = True
elif purpose == "dictionary":
case = False
elif purpose == "any":
if self._format in ['all', 'adjacency']:
case = True
else:
case = False
else:
raise ValueError('unsupported label purpose')
if case:
self.desired_format("adjacency", warn=True)
if label_type == "vertex":
if not bool(self.index_node_labels):
if self.construct_label:
self.construct_labels(label_type, purpose)
else:
if return_none:
return None
else:
raise ValueError('Graph does not have any labels for vertices.')
return self.index_node_labels
elif label_type == "edge":
if not bool(self.index_edge_labels):
if self.construct_label:
self.construct_labels(label_type, purpose)
else:
if return_none:
return None
else:
raise ValueError('Graph does not have any labels for edges.')
return self.index_edge_labels
else:
raise ValueError('label type can only be "vertex" or "edge"')
else:
self.desired_format("dictionary", warn=True)
if label_type == "vertex":
if not bool(self.node_labels):
if self.construct_label:
self.construct_labels(label_type, purpose)
else:
if return_none:
return None
else:
raise ValueError('Graph does not have any labels for vertices.')
return self.node_labels
elif label_type == "edge":
if not bool(self.edge_labels):
if self.construct_label:
self.construct_labels(label_type, purpose)
else:
if return_none:
return None
else:
raise ValueError('Graph does not have any labels for edges.')
return self.edge_labels
else:
raise ValueError('label type can only be "vertex" or "edge"')
def get_label_group(self, label_type="vertex", purpose="dictionary"):
"""Calculate the inverse dictionary for vertex labels (once).
Parameters
----------
label_type : str, valid_values={"vertex", "edge"}, default="vertex"
Defines if the the labels-group will correspond
to vertices or edges.
purpose : str, valid_values={"dictionary", "adjacency", "any"},
default="dictionary"
Defines if the labels-group will correspond to "dictionary"
(symbols), to "adjacency" (indexes) or to "any" valid format
(if "all" the result is for "adjacency").
Returns
-------
label_group : dict
Returns the inverse label group.
"""
if not bool(self.label_group):
self.label_group = dict()
if not (label_type, purpose) in self.label_group:
self.label_group[(label_type, purpose)] = dict()
if not bool(self.label_group[(label_type, purpose)]):
# calculate the label_group
self.label_group[(label_type, purpose)] = \
inv_dict(self.get_labels(label_type, purpose))
return self.label_group[(label_type, purpose)]
def neighbors(self, vertex, purpose="any", with_weights=False):
"""Find all neighbors of a vertex.
Parameters
----------
vertex : hashable
The vertex, which neighbors we are searching for.
purpose : str, valid_values={"adjacency", "dictionary", "any"},
default="any"
Defines if the vertex is given for the "dictionary" format of
the graph (symbol) to the "adjacency" (index) or to "any"
existing format (if "all" the expected type is for "adjacency").
with_weights : bool, default=False
Defines if the neighbours will be outputed with weights.
Returns
-------
neighbors : list or dict
The neighbors of the given vertex.
+ with_weights=False: list of neighbor vertices
+ with_weights=True: dictionary between neighbor vertices and
edge labels
"""
if purpose in ['adjacency', 'dictionary', "any"]:
if purpose == 'dictionary':
self.desired_format('dictionary')
case = True
if purpose == 'adjacency':
self.desired_format('adjacency')
case = False
if purpose == "any":
if self._format in ['all', 'adjacency']:
case = False
else:
case = True
else:
raise ValueError('purpose is either "adjacency", ' +
'"dictionary" or "any"')
if case:
if vertex in self.edge_dictionary:
if not with_weights:
return list(self.edge_dictionary[vertex].keys())
else:
return self.edge_dictionary[vertex]
else:
raise ValueError('vertex not inside edge dictionary')
else:
idx = int(vertex)
if 0 <= idx < self.n:
out_idx = np.where(self.adjacency_matrix[idx, :] > 0)
ns = out_idx[0].tolist()
if not with_weights:
return ns
else:
return dict(zip(ns,
self.adjacency[idx, out_idx].tolist())[0])
else:
raise ValueError("item with index "+str(idx)+" does not exist")
def _make_edsamic(self, vertices):
"""Produce the `edsamic`.
`edsamic` is *edge-symbols-adjacency-matrix-correspondance-dictionary*.
Parameters
----------
vertices : list-able, sort-able
The vertices of the graph.
all_out : bool, default=True
A variable that defines, if everything valuable will be outputed.
Returns
-------
edsamic : dict
The produced edsamic
n : int
The number of vertieces
lov_sorted : list
Sorted list of vertices.
"""
if bool(vertices):
# vertices to list
lov = list(vertices)
# length is adjacency matrix size
n = len(lov)
# sort lov indexes for labeling
lov_sorted = sorted(lov)
# edge_labels_adjacency_matrix_correspondance_dictionary
edsamic = dict(zip(lov_sorted, range(n)))
return edsamic, n, lov_sorted
else:
warnings.warn('wrong format: edge dictionary must exist')
return None
def _import_adjacency(self, adjacency_matrix=None, init=True):
"""Create a graph object representation, given its adjacency matrix.
Parameters
----------
adjacency_matrix : array-like, default=None
If given a array the input can be as follows:
+ array-like lists of lists
+ np.array
+ sparse matrix (scipy.sparse.csr.csr_matrix)
If None, imports the existing array, inside self.adjacency_matrix
init : bool, default=True
A parameter used to defined initialization.
Returns
-------
None.
"""
if adjacency_matrix is not None:
# calculate graph size
is_adj, adjacency_matrix = is_adjacency(adjacency_matrix, True)
if not is_adj:
raise ValueError('unsupported format type '
'for adjacency matrix')
n = adjacency_matrix.shape[0]
if n != adjacency_matrix.shape[1]:
raise ValueError('input matrix must be squared')
# import_adjacency
if (self._format == "all" or self._format == "adjacency"):
self.adjacency_matrix = adjacency_matrix
self.n = n
else:
n = self.n
adjacency_matrix = self.adjacency_matrix
# construct a dictionary out of the adjacency
if self._format in ["all", "dictionary"]:
vertices = set(range(n))
self.vertices = vertices
self.edge_dictionary = {i: dict() for i in range(n)}
idx_i, idx_j = np.where(adjacency_matrix > 0)
for (i, j) in zip(idx_i, idx_j):
self.edge_dictionary[i][j] = adjacency_matrix[i, j]
# Add labels
self.convert_labels(target_format="dictionary",
purpose="all",
init=init)
# Prune not interesting format
if self._format == "dictionary":
self.n = -1
self.adjacency_matrix = None
self.edsamic = None
self.index_node_labels = None
self.index_edge_labels = None
else:
self.edsamic = dict(zip(range(n), range(n)))
def _import_dictionary(self, edge_dictionary=None, init=True):
"""Create a graph object representation given its edge dictionary.
Parameters
----------
edge_dictionary : dict-like, default=None
The edge_dictionary the input can be as follows:
+ 2-level nested dictionaries from edge symbols to weights
+ Dictionary of symbols to list of symbols (unweighted)
+ Dictionary of tuples to weights (weighted)
+ Iterable of tuples of len 2 (unweighted)
+ Iterable of tuples of len 3 (vertex, vertex, weight)
If None, imports from the existing edge_dictionary
init : bool, default=True
A parameter used to defined initialization.
Returns
-------
None.
"""
if edge_dictionary is not None:
# find vertices, refine dictionary
vertices = set()
edge_dictionary_refined = dict()
is_edge_dict, vertices, edge_dictionary_refined =\
is_edge_dictionary(edge_dictionary, True)
if not is_edge_dict:
raise ValueError('unsupported edge_dictionary format')
# Save dictionary, vertices @self if needed
if (self._format == "all" or self._format == "dictionary"):
self.edge_dictionary = edge_dictionary_refined
self.vertices = vertices
else:
vertices = self.vertices
edge_dictionary_refined = self.edge_dictionary
# Create and store the adjacency matrix
if self._format in ["adjacency", "all"]:
edsamic, self.n, lov_sorted = self._make_edsamic(vertices)
# Initialize adjacency_matrix
adjacency_matrix = np.zeros(shape=(self.n, self.n))
# Produce and save adjacency matrix
for k in edge_dictionary_refined.keys():
for l in edge_dictionary_refined[k].keys():
adjacency_matrix[edsamic[k], edsamic[l]] = \
edge_dictionary_refined[k][l]
self.adjacency_matrix = adjacency_matrix
if self._format == "all":
self.edsamic = edsamic
# Add labels
self.convert_labels(target_format="adjacency",
purpose="all",
init=init)
# Prune for a certain format
if self._format == "adjacency":
self.edge_dictionary = None
self.vertices = None
self.node_labels = None
self.edge_labels = None
def laplacian(self, save=True):
"""Calculate the laplacian of the given graph.
Parameters
----------
save : bool, default=True
Optional parameter to store the matrix.
Returns
-------
laplacian : array-like
Returns the graph laplacian
"""
if self.laplacian_graph is not None:
laplacian_graph = self.laplacian_graph
else:
self.desired_format("adjacency", warn=True)
laplacian_graph = laplacian(self.adjacency_matrix)
if save:
self.laplacian_graph = laplacian_graph
return laplacian_graph
def get_vertices(self, purpose="adjacency"):
"""Create an iterable of vertices.
Parameters
----------
purpose : str, valid_values={"adjacency", "dictionary", "any"},
default="adjacency"
Defines if the vertices will correspond given for the "dictionary"
format of the graph (symbol) to the "adjacency" (index) or to "any"
existing format (if "all" the expected type is for "adjacency").
Returns
-------
vertices : iterable
Returns an iterable on vertices
"""
if purpose not in ["adjacency", "dictionary", "any"]:
raise ValueError('purpose is either "adjacency" of "dictionary"')
if purpose == "any":
if self._format in ['all', 'adjacency']:
purpose = "adjacency"
else:
purpose = "dictionary"
if purpose == "adjacency":
self.desired_format("adjacency", warn=True)
return range(0, self.n)
if purpose == "dictionary":
self.desired_format("dictionary", warn=True)
return self.vertices
def get_edges(self, purpose="adjacency", with_weights=False):
"""Create an iterable of edges as tuples.
Parameters
----------
purpose : str, valid_values={"adjacency", "dictionary"}, default="adjacency"
Defines if the edges is given for the "dictionary" format of the
graph (symbol) to the "adjacency" (index).
Returns
-------
vertices : list
Returns a list of tuples for edges.
"""
if purpose not in ["adjacency", "dictionary"]:
raise ValueError('purpose is either "adjacency" of "dictionary"')
if purpose == "adjacency":
self.desired_format("adjacency", warn=True)
idx_i, idx_j = np.where(self.adjacency_matrix > 0)
edges = zip(idx_i, idx_j)
if with_weights:
return list(zip(edges, self.adjacency_matrix[idx_i, idx_j]))
else:
return list(edges)
if purpose == "dictionary":
self.desired_format("dictionary", warn=True)
if with_weights:
return [(i, j) for i in self.edge_dictionary.keys()
for j in self.edge_dictionary[i].keys()]
else:
return [((i, j), self.edge_dictionary[i][j])
for i in self.edge_dictionary.keys()
for j in self.edge_dictionary[i].keys()]
def get_adjacency_matrix(self):
"""Get the adjacency matrix.
Format agnostic method.
Parameters
----------
None.
Returns
-------
adjacency_matrix : np.array
Returns the adjacency matrix of the current graph.
"""
if self._format == "dictionary":
A = np.zeros(shape=(len(self.vertices), len(self.vertices)))
v_map = {v: i for (i, v) in enumerate(sorted(list(self.vertices)))}
for (k, v) in iteritems(self.edge_dictionary):
i = v_map[k]
for (kv, w) in iteritems(v):
A[i, v_map[kv]] = w
return A
else:
return self.adjacency_matrix
def get_edge_dictionary(self):
"""Get the edge dictionary.
Format agnostic method.
Parameters
----------
None.
Returns
-------
edge_dictionary : dict
Returns the edge_dictionary of the current graph.
"""
if self._format != "dictionary":
idx_i, idx_j = np.where(self.adjacency_matrix > 0)
edge_dictionary = {i: dict() for i in range(0, self.n)}
for (i, j) in zip(idx_i, idx_j):
edge_dictionary[i][j] = self.adjacency_matrix[i, j]
return edge_dictionary
else:
return self.edge_dictionary
def nv(self):
"""Get the number of vertices for any existing format.
Parameters
----------
None.
Returns
-------
num_of_vertices : int
Returns the number of vertices.
"""
if self._format in ['all', 'adjacency']:
return self.n
else:
return len(self.vertices)
def produce_neighborhoods(
self,
r=3,
purpose="adjacency",
with_distances=False,
d=-1,
sort_neighbors=True):
"""Calculate neighborhoods for each node of a Graph, up to a depth.
Parameters
----------
r : int
The neighborhood depth (radius).
purpose : str, valid_values={"adjacency", "dictionary"},
default="adjacency"
Defines if the neighborhood symbols will correspond given
for the "dictionary" format of the graph (symbol) to the
"adjacency" (index).
with_distances : bool, default=False
Defines if we need to calculate BFS distances for each pair.
d : int, default=-1
Maximum distance considered. If -1 is provided the distance is as
max as the radius r.
Returns
-------
N : dict
A level, vertex nested dictionary of lists, corresponding to the
neighbors of level :math:`l` for a certain vertex :math:`v`.
D : dict
For each level, set of tuples of nodes connected in that level.
Appears only if with_distances is *True*.
Dist_pair : dict
A dictionary of all pairs and their distances.
"""
N = dict()
if purpose == "any":
if self._format == "all":
purpose = "adjacency"
else:
purpose = self._format
if purpose not in ["adjacency", "dictionary"]:
raise ValueError('purpose is either "adjacency" or "dictionary"')
if r < 0:
raise ValueError('r must be positive or equal to zero')
if with_distances and d < 0:
d = r
warnings.warn('negative d as input - d set to r')
# chain neighbors
if sort_neighbors:
def chain(n):
return sorted(n)
else:
def chain(n):
return n
# Initialize neighborhoods
vertices = list(self.get_vertices(purpose))
N[0] = {i: {i} for i in vertices}
# and distances
if with_distances:
D = dict()
D[0] = set(zip(vertices, vertices))
Dist_pair = {(v, v): 0 for v in vertices}
if r > 0:
N[1] = dict()
if with_distances and d >= 1:
D[1] = set()
for i in vertices:
ns = list(self.neighbors(i, purpose))
N[1][i] = chain([i]+ns)
if with_distances and d >= 1:
dset = {(i, n) for n in ns}
Dist_pair.update(zip(dset, len(dset)*[1]))
D[1] |= dset
# calculate neighborhoods by a recursive formula
# for all levels from 1 to r
for level in range(1, max(r, d)):
N[level+1] = dict()
if with_distances and level <= d-1:
D[level+1] = set()
for i in vertices:
neighbors = set()
for w in N[level][i]:
neighbors |= set(N[level][w])
N[level+1][i] = chain(list(neighbors))
if with_distances and level <= d-1:
dset = {(i, j)
for j in (neighbors - set(N[level][i]))}
Dist_pair.update(zip(dset, len(dset)*[level+1]))
D[level+1] |= dset
if with_distances:
for level in range(r+1, d):
N.pop(level, None)
if with_distances:
return N, D, Dist_pair
else:
return N
def get_graph_object(self):
"""Return the graph object corresponding to 'any'.
Format agnostic method.
Parameters
----------
None.
Returns
-------
g : dict or np.array
The graph Object.
"""
if self._format in ["adjacency", "all"]:
return self.adjacency_matrix
else:
return self.edge_dictionary
def get_subgraph(self, vertices):
"""Calculate the subgraph of object in the same format as the original.
Parameters
----------
vertices : iterable
An iterbale vertices extracted from the original graph.
Returns
-------
subgraph : graph
The induced subgraph, from the input vertices.
"""
if vertices is None:
raise ValueError('vertices must not be null')
if type(vertices) in [str, int]:
vertices = {vertices}
else:
vertices = set(vertices).copy()
subgraph = Graph(graph_format=self._format)
if self._format == 'adjacency':
for v in vertices:
if v < 0 or v >= self.n:
raise ValueError('vertices are not valid '
'for the original graph format')
recipe = [('enum_vertices', 'default'),
('add_adjacency',)]
elif self._format == 'dictionary':
for v in vertices:
if v not in self.edge_dictionary:
raise ValueError('vertices are not valid '
'for the original graph format')
recipe = [('add_edge_dict',)]
else:
fv = False
fa = False
for v in vertices:
if not fv and v not in self.edge_dictionary:
fv = True
if not fa and (v < 0 or v >= self.n):
fa = True
if fa and fv:
raise ValueError('vertices are not valid for '
'the original graph format')
if not fa and fv:
recipe = [('enum_vertices', 'default'),
('get_correct', 'edsamic'),
('add_adjacency',),
('add_edge_dict',)]
elif not fa:
recipe = [('enum_vertices', 'default'),
('get_correct', 'edsamic'),
('add_adjacency',),
('idxs_to_nodes',),
('add_edge_dict',)]
elif not fv:
recipe = [('enum_vertices', 'edsamic'),
('get_correct', 'edsamic'),
('add_adjacency',),
('add_edge_dict',)]
def get_correct(i):
return i
for ingredient in recipe:
if ingredient[0] == 'idxs_to_nodes':
vertices = {self.edsamic[i] for i in vertices}
elif ingredient[0] == 'enum_vertices':
if ingredient[1] == 'edsamic':
inverse_edsamic = inv_dict(self.edsamic)
lov_sorted = sorted([int(inverse_edsamic[v][0])
for v in vertices])
new_indexes, inv_new_indexes = {}, {}
for (i, l) in enumerate(lov_sorted):
new_indexes[i], inv_new_indexes[l] = l, i
subgraph.edsamic = {self.edsamic[inv_new_indexes[nik]]:
nik for nik in new_indexes.keys()}
else:
lov_sorted = sorted(list(vertices))
new_indexes = {l: i for (i, l) in enumerate(lov_sorted)}
elif ingredient[0] == 'add_adjacency':
subgraph.adjacency_matrix = self.adjacency_matrix[lov_sorted, :][:, lov_sorted]
subgraph.n = len(new_indexes.keys())
if bool(self.index_node_labels):
subgraph.index_node_labels = {
new_indexes[k]: self.index_node_labels[k]
for k in self.index_node_labels.keys()
if k in vertices}
if bool(self.index_edge_labels):
subgraph.index_edge_labels = {
(new_indexes[i], new_indexes[j]):
self.index_edge_labels[(i, j)]
for (i, j) in self.index_edge_labels
if (i in vertices and j in vertices)}
elif ingredient[0] == 'add_edge_dict':
subgraph.edge_dictionary = {
get_correct(i): {
get_correct(j): self.edge_dictionary[i][j]
for j in self.edge_dictionary[i] if j in vertices}
for i in self.edge_dictionary if i in vertices}
subgraph.vertices = vertices
if bool(self.node_labels):
subgraph.node_labels = {
get_correct(k): self.node_labels[k]
for k in self.node_labels.keys() if k in vertices}
if bool(self.edge_labels):
subgraph.index_edge_labels = {
(get_correct(i), get_correct(j)):
self.edge_labels[(i, j)]
for (i, j) in self.edge_labels
if (i in vertices and j in vertices)}
elif ingredient[0] == 'get_correct':
if ingredient[1] == 'edsamic':
def get_correct(i):
return self.edsamic[new_indexes[i]]
return subgraph
[docs]def is_adjacency(g, transform=False):
"""Define if input is in a valid adjacency matrix format.
Parameters
----------
g : Object
The input object.
transform : bool, default=False
Defines if the input will be transformed to the internal adjacency
matrix support format.
Returns
-------
is_adjacency : bool
A variable that determines if the input is a valid adjacency matrix.
g_transformed : np.array
Holds the transformed object to an np.array.
This output appears **only** if transform parameter is True.
"""
if type(g) in [np.array, np.ndarray] and len(g.shape) == 2:
if transform:
return True, g
else:
return True
elif isspmatrix(g):
if transform:
return True, g.todense()
else:
return True
elif type(g) is list and all(
isinstance(l, list) and
all(isinstance(i, numbers.Number) for i in l) for l in g):
if transform:
return True, np.array(g)
else:
return True
else:
if transform:
return False, None
else:
return False
[docs]def is_edge_dictionary(g, transform=False):
"""Define if input is in a valid edge dictionary format.
Parameters
----------
g : Object
The input object.
transform : bool, default=False
Defines if the input will be transformed to the internal
edge dictionary support format.
Returns
-------
is_edge_dictionary : bool
A variable that determines if the input is a valid edge dictionary.
g_transformed_vertices : set
Holds the transformed object vertices of the edge_dictionary.
g_transformed_edge_dict : dict
Holds the transformed object as a 2-level edge dict.
This output appears **only** if transform parameter is True.
"""
if type(g) is dict:
if all(
type(k) is tuple and len(k) == 2 and
isinstance(n, numbers.Number) for (k, n) in iteritems(g)):
if transform:
vertices_key = set()
vertices_val = set()
edge_dict = dict()
for (k, v) in iteritems(g):
vertices_key.add(k[0])
vertices_val.add(k[1])
nested_dict_add(edge_dict, v, k[0], k[1])
# intialise empty edges
v_dif = vertices_val - vertices_key
if len(v_dif) > 0:
for v in v_dif:
edge_dict[v] = dict()
return True, vertices_key | vertices_val, edge_dict
else:
return True
if all(isinstance(d, list) for d in itervalues(g)):
if transform:
vertices_key = set()
vertices_val = set()
edge_dict = dict()
for (k, v) in iteritems(g):
vertices_key.add(k)
vertices_val |= set(v)
for kp in v:
nested_dict_add(edge_dict, 1., k, kp)
# intialise empty edges
v_dif = vertices_val - vertices_key
if len(v_dif) > 0:
for v in v_dif:
edge_dict[v] = dict()
return True, vertices_key | vertices_val, edge_dict
else:
return True
if all(
isinstance(d, dict) and
all(isinstance(n, numbers.Number)
for n in itervalues(d)) for d in itervalues(g)):
if transform:
vertices_key = set(g.keys())
vertices_val = {kp for k in g.keys() for kp in g[k].keys()}
v_dif = vertices_val - vertices_key
# intialise empty edges
if len(v_dif) > 0:
for v in v_dif:
g[v] = dict()
return True, vertices_key | vertices_val, g
else:
return True
if isinstance(g, collections.Iterable):
if all(type(t) is tuple and len(t) == 2 for t in g):
if transform:
vertices_key = set()
vertices_val = set()
edge_dict = dict()
for (v, u) in g:
vertices_key.add(v)
vertices_val.add(u)
nested_dict_add(edge_dict, 1., v, u)
v_dif = vertices_val - vertices_key
# intialise empty edges
if len(v_dif) > 0:
for v in v_dif:
edge_dict[v] = dict()
return True, vertices_key | vertices_val, edge_dict
else:
return True
elif all(type(t) is tuple and len(t) == 3 for t in g):
if transform:
vertices_key = set()
vertices_val = set()
edge_dict = dict()
for (v, u, w) in g:
vertices_key.add(v)
vertices_val.add(u)
nested_dict_add(edge_dict, w, v, u)
v_dif = vertices_val - vertices_key
# intialise empty edges
if len(v_dif) > 0:
for v in v_dif:
edge_dict[v] = dict()
return True, vertices_key | vertices_val, edge_dict
else:
return True
if transform:
return False, None
else:
return False
def dijkstra(edge_dictionary, start_vertex, end_vertex=None):
"""Calculate the dijkstra algorithm `see`_.
Parameters
----------
edge_dictionary: dict
A 2-level nested dictionary of symbols, with value corresponding to
the weight.
start_vertex: hashable
The start vertex symbol
(should exists as a key inside edge_dictionary).
end_vertex: hashable
The end vertex symbol (should exists as a key inside edge_dictionary).
Returns
-------
dict_fd : dict
The dictionary of final distances.
dict_pred : dict
The dictionary of predecessors.
.. note::
The majority of this function code came from :ref:`here`_.
.. _here: http://code.activestate.com/recipes/
119466-dijkstras-algorithm-for-shortest-paths/
"""
dict_fd = {} # dictionary of final distances
dict_pred = {} # dictionary of predecessors
queue = priority_dict() # est.dist. of non-final vert.
queue[start_vertex] = 0
for v in queue:
dict_fd[v] = queue[v]
if v == end_vertex:
break
for w in edge_dictionary[v]:
vwLength = dict_fd[v] + edge_dictionary[v][w]
if w in dict_fd:
if vwLength < dict_fd[w]:
raise ValueError("Dijkstra: found better path to "
"already-final vertex")
elif w not in queue or vwLength < queue[w]:
queue[w] = vwLength
dict_pred[w] = v
return dict_fd, dict_pred
[docs]def floyd_warshall(adjacency_matrix):
"""Calculate the Floyd Warshall, shortest path matrix.
Parameters
----------
adjacency_matrix : np.array, square
The adjacency matrix of the graph, on which the distances are being
calculated.
Returns
-------
dist : np.array
The shortest path matrix as produced by floyd warshall
"""
n = adjacency_matrix.shape[0]
# Initialization
dist = np.array(adjacency_matrix, copy=True).astype(float)
dist[dist == 0] = float("Inf")
np.fill_diagonal(dist, 0)
# Calculation
for k in range(n):
for i in range(n):
dist[i, :] = np.minimum(dist[i, :], dist[i, k] + dist[k, :])
return dist