I am trying to calculate the sum of the node values in a spark graph graph. In short, a graph is a tree, and the top node (root) should summarize all the children and their children. My graph is actually a tree that looks like this, and the expected total value should be 1850 :
+
+
| | | Value: 1000
+
+
| | | Value: +
| +
+
| | VertexId 20 +
| | Value:
+
|
| +
+
| | Value: 300
+
The first blow in this is as follows:
val vertices: RDD[(VertexId, Int)] =
sc.parallelize(Array((20L, 0)
, (11L, 0)
, (14L, 1000)
, (24L, 550)
, (911L, 300)
))
val edges: RDD[Edge[Int]] =
sc.parallelize(Array(
Edge(14L, 11L, 1),
Edge(24L, 11L, 1),
Edge(11L, 20L, 1),
Edge(911L, 20L, 1)
))
val dataItemGraph = Graph(vertices, edges)
val sum: VertexRDD[(Int, BigDecimal, Int)] = dataItemGraph.aggregateMessages[(Int, BigDecimal, Int)](
sendMsg = { triplet => triplet.sendToDst(1, triplet.srcAttr, 1) },
mergeMsg = { (a, b) => (a._1, a._2 * a._3 + b._2 * b._3, 1) }
)
sum.collect.foreach(println)
This returns the following:
(20,(1,300,1))
(11,(1,1550,1))
It does the sum for vertex 11, but does not collapse to the root of the node (vertex 20). What am I missing or is there a better way to do this? Of course, a tree can have an arbitrary size, and each vertex can have an arbitrary number of edges of children.