We have a shortest path search destination using the Pregel API for 3lac vertices. We must make each vertex the source vertex once and determine the shortest path among all these executions. My code looks below
def shortestPath(sc: SparkContext, mainGraph: Graph[(String, String, Double), Double], singleSourceVertexFlag: Boolean) {
var noOfIterations = mainGraph.vertices.count();
// If single source vertext is true, pass only count as one iteration only
if (singleSourceVertexFlag) {
noOfIterations = 1
} else { // else loop through complete list of vertices
noOfIterations = mainGraph.vertices.count()
}
for (i <- 0 to (noOfIterations.toInt - 1)) {
val sourceId: VertexId = i
val modGraph = mainGraph.mapVertices((id, attr) =>
if (id == sourceId) (0.0)
else (Double.PositiveInfinity))
val loopItrCount = modGraph.vertices.count().toInt;
val sssp = modGraph.pregel(Double.PositiveInfinity, loopItrCount, EdgeDirection.Out)(
(id, dist, newDist) =>
if (dist < newDist) dist
else newDist, // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) =>
if (a < b) a // Merge Message
else b)
sssp.unpersist(true)
modGraph.unpersist(true)
println("****Shortest Path End**** SourceId" + sourceId)
}
}
From this code I have to read the shortest path from each cycle, and from them define the minimum value as the final output (which is the future part, and I have not yet code for the same).
15node, 1112 node. 22k node, 55source, Out of memory. node (1node - 64GB RAM, 2node - 32GB RAM)
,
1. for Spark-? -, , ?
2. unpersist, RDD , . , 55 . ?