How to execute a Pregel Shortest Path, creating all vertices as a source vertex once on Spark Cluster

We have a shortest path search destination using the Pregel API for 3lac vertices. We must make each vertex the source vertex once and determine the shortest path among all these executions. My code looks below

def shortestPath(sc: SparkContext, mainGraph: Graph[(String, String, Double), Double], singleSourceVertexFlag: Boolean) {

var noOfIterations = mainGraph.vertices.count();
// If single source vertext is true, pass only count as one iteration only
if (singleSourceVertexFlag) {
  noOfIterations = 1
} else { // else loop through complete list of vertices
  noOfIterations = mainGraph.vertices.count()
}

for (i <- 0 to (noOfIterations.toInt - 1)) {
  val sourceId: VertexId = i
  val modGraph = mainGraph.mapVertices((id, attr) =>
    if (id == sourceId) (0.0)
    else (Double.PositiveInfinity))

  val loopItrCount = modGraph.vertices.count().toInt;
  val sssp = modGraph.pregel(Double.PositiveInfinity, loopItrCount, EdgeDirection.Out)(

    (id, dist, newDist) =>
      if (dist < newDist) dist
      else newDist, // Vertex Program

    triplet => { // Send Message
      if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
        Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
      } else {
        Iterator.empty
      }
    },

    (a, b) =>
      if (a < b) a // Merge Message
      else b)

  sssp.unpersist(true)
  modGraph.unpersist(true)

  println("****Shortest Path End**** SourceId" + sourceId)

}

}

From this code I have to read the shortest path from each cycle, and from them define the minimum value as the final output (which is the future part, and I have not yet code for the same).

15node, 1112 node. 22k node, 55source, Out of memory. node (1node - 64GB RAM, 2node - 32GB RAM)

,
1. for Spark-? -, , ?
2. unpersist, RDD , . , 55 . ?

+4

Source: https://habr.com/ru/post/1617957/


All Articles