The Property Graph Model
A property graph is a collection of Nodes and Directed Arcs. Each node represents an entity and has an unique id as well as a Node Type. The Node Type defines a set of metadata that the node has. Each arc represents a unidirectional relationship between two entities and has an Arc Type. The Arc Type defines a set of metadata that the arc has.
General Graph Processing
I found many of the graph algorithms follows a general processing pattern. There are multiple rounds of (sequential) processing iterations. Within each iteration, there are a set of active nodes that perform local processing in parallel. The local processing can modify the node's properties, adding or removing links to other nodes, as well as sending message across links. All message passing are done after the local processing.
This model is similar to the Google Pregel model.
Notice that this model maps well into parallel computing environment where the processing of the set of active node can be spread across multiple processors (or multiple machines in a cluster)
Notice that all messages from all in-coming links are arrived before the link changes within local processing. On the other hand, all message send to all out-going links after the links have changed after the local processing. The term "local processing" means it cannot modify the properties of other nodes or other links.
Because two nodes can simultaneously modify the link in-between them, the following conflicts can happen
- A node delete a link while other node modifies the link properties.
- Both nodes on each side modify the properties of the link in-between
Neo4j provide a restricted, single-threaded graph traversal model
- At each round, the set of active nodes is always a single node
- The set of active nodes of next round is determined by the traversal policy (breath or depth-first), but is still a single node
- It offer a callback function to determine whether this node should be included in the result set
- The node is expressed as Node(id, inE, outE, properties)
- The arc is expressed as Arc(id, type, inV, outV, properties)
./outE[@type='write']/inV/inE[@type='write']/outV
Path Algebra
Marko Rodriguez has described of a set of matrix operations when the Graph is represented as adjacency matrix. Graph algorithms can be describe as an algebraic form.
Traverse operation can be expressed as Matrix multiplication. If A is the adjacency matrix of the graph. Then A.A represents connection with path of length = 2.
Similar, Merge operation can be expressed as Matrix addition. For example, (A + A.A + A.A.A) represent connectivity within 3 degree of reach.
In a special case when the graph represents a relationship between 2 types of entities. e.g. if A represents a authoring relationship (person write a book). Then A.(A.transpose()) represents co-author relationship (person co-author with another person).
Marko also introduce a set of Filter operations, (not filter / clip filter /column filter /row filter /vertex filter)
Map Reduce
Depends on the traversal strategies inherit from the graph algorithms, certain algorithms which has higher sequential dependency doesn't fit well into parallel computing. For example, graph algorithms with a breath-first search nature fits better into parallel computing paradigm with those that has a depth-first search nature. On the other hand, perform search at all nodes fits better in parallel computing than perform search at a particular start node.
There are different storage representation of graph, from incident list, incident matrix, adjacency list and adjacency matrix. For sparse graph (which is the majority cases), lets assume adjacency list is used for the storage model for subsequent discussion.
In other words, the graph is represented as a list of records, each record is [node, connected_nodes].
There are many graph algorithms and it is not my intend is have an exhausted list. Below are the one that I have used in my previous projects that can be translated into Map/Reduce form.
Topological Sort is commonly used to sort out a work schedule based on dependency tree. It can be done as follows ...
# Topological Sort
# Input records
[node_id, prerequisite_ids]
# Output records[node_id, prerequisite_ids, dependent_ids]
class BuildDependentsJob {
map(node, prerequisite_ids) {
for each prerequisite_id in prerequisite_ids {
emit(prerequisite_id, node)
}
}
reduce(node, dependent_ids) {
emit(node, [node, prerequisite_ids, dependent_ids])
}
}
class BuildReadyToRunJob {
map(node, node) {
if ! done?(node) and node.prerequisite_ids.empty? {
result_set.append(node)
done(node)
for each dependent_id in dependent_ids {
emit(dependent_id, node)
}
}
}
reduce(node, done_prerequsite_ids) {
remove_prerequisites(node, done_prerequsite_ids)
}
}
# Topological Sort main program
main() {
JobClient.submit(BuildDependentsJob.new)
Result result = []
result_size_before_job = 0
result_size_after_job = 1
while (result_size_before_job < result_size_after_job) {
result_size_before_job = result.size
JobClient.submit(BuildReadyToRunJob.new)
result_size_after_job = result.size
}
return result
}
Minimum spanning tree is a pretty common algorithm, the Prim's algorithm looks like the following.
# Minimum Spanning Tree (MST) using Prim's algorithm
Adjacency Matrix, W[i][j] represents weights
W[i][j] = infinity if node i, j is disconnected
MST has nodes in array N = [] and arcs A = []
E[i] = minimum weighted edge connecting to the skeleton
D[i] = weight of E[i]
Initially, pick a random node r into N[]
N = [r] and A = []
D[r] = 0; D[i] = W[i][r];
Repeat until N[] contains all nodes
Pick node k outside N[] where D[k] is minimum
Add node k to N; Add E[k] to A
for all node p connected to node k
if W[p][k] < D[p]
D[p] = W[p][k]
E[p] = k
end
end
end
We are doing the map/reduce here because it is very similar to another popular algorithm single source shortest path. The Map/Reduce form of the SPSS based on Dijkstra's algorithm is as follows ...
# Single Source Shortest Path (SSSP) based on Dijkstra
Adjacency Matrix, W[i][j] represents weights of arc
connecting node i to node j
W[i][j] = infinity if node i, j is disconnected
SSSP has nodes in array N = []
L[i] = Length of minimum path so far from the source node
Path[i] = Identified shortest path from source to i
Initially, put the source node s into N[]
N = [s]
L[s] = 0; L[i] = W[s][i];
Path[i] = arc[s][i] for all nodes directly connected
from source.
Repeat until N[] contains all nodes
Pick node k outside N[] where L[k] is minimum
Add node k to N;
for all node p connected from node k {
if L[k] + W[k][p] < L[p] {
L[p] = L[k] + W[k][p]
Path[p] = Path[k].append(Arc[k][p])
}
}
end repeat
# Here is the map/reduce pseudo code would look like
class FindMinimumJob
map(node_id, path_length) {
if not N.contains(node_id) {
emit(1, [path_length, node_id])
}
}
reduce(k, v) {
min_node, min_length = minimum(v)
for each node in min_node.connected_nodes {
emit(node, min_node)
}
}
}
class UpdateMinPathJob {
map(node, min_node) {
if L[min_node] + W[min_node][node] < L[node] {
update L[node] =L[min_node] + W[min_node][node]
Path[node] =
Path[min_node].append(arc(min_node, node))}
}
}# Single Source Shortest Path main program
main() {
init()
while (not N.contains(V)) {
JobClient.submit(FindMinimumJob.new)JobClient.submit(UpdateMinPathJob.new)
}
return Path
}
The same SSSP problem can also be solved using breath-first search. The intuition is to grow a frontier from the source at each iteration and update the shortest distance from the source.
# Single Source Shortest Path (SSSP) using BFS
Adjacency Matrix, W[i][j] represents weights of arc
connecting node i to node j
W[i][j] = infinity if node i, j is disconnected
Frontier nodes in array F
L[i] = Length of minimum path so far from the source node
Path[i] = Identified shortest path from source to i
Initially,
F = [s]
L[s] = 0; L[i] = W[s][i];
Path[i] = arc[s][i] for all nodes directly connected
from source.
# input is all nodes in the frontier F
# output is frontier of next round FF
class GrowFrontierJob {
map(node) {
for each to_node in node.connected_nodes {
emit(to_node, [node, L[node] + W[node][to_node]])
}
}
reduce(node, from_list) {
for each from in from_list {
from_node = from[0]
length_via_from_node = from[1]
if (length_via_from_node < L[node] {
L[node] = length_via_from_node
Path[node] =
Path[from_node].append(arc(from_node, node))
FF.add(node)
}
}
}
}
# Single Source Shortest Path BFS main program
main() {
init()
while (F is non-empty) {
JobClient.set_input(F)
JobClient.submit(FindMinimumJob.new)
copy FF to F
clear FF
}
return Path
}