Shortest Graphx Speed ​​with Spark

I am creating a graph from a gzcompressed jsonfile of type edgeand vertices.

I put the files in the Dropbox folder here

I load and map these entries jsonto create the types verticesand edgenecessary for the graphxfollowing:

val vertices_raw = sqlContext.read.json("path/vertices.json.gz")
val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[Long]("index")))
val verticesRDD: RDD[(VertexId, Long)] = vertices
val edges_raw = sqlContext.read.json("path/edges.json.gz")
val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong, row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length"))))
val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, edgesRDD).partitionBy(PartitionStrategy.RandomVertexCut)

Then I use this implementation dijkstra, which I found, to calculate the shortest path between two vertices:

def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = {
          var g2 = g.mapVertices(
        (vid, vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]())
          )
          for (i <- 1L to g.vertices.count - 1) {
            val currentVertexId: VertexId = g2.vertices.filter(!_._2._1)
              .fold((0L, (false, Double.MaxValue, List[VertexId]())))(
                (a, b) => if (a._2._2 < b._2._2) a else b)
              ._1

            val newDistances: VertexRDD[(Double, List[VertexId])] =
              g2.aggregateMessages[(Double, List[VertexId])](
            ctx => if (ctx.srcId == currentVertexId) {
              ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId))
            },
            (a, b) => if (a._1 < b._1) a else b
          )
        g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
          val newSumVal = newSum.getOrElse((Double.MaxValue, List[VertexId]()))
          (
            vd._1 || vid == currentVertexId,
            math.min(vd._2, newSumVal._1),
            if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
            )
        })
        }

          g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
        (vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]()))
          .productIterator.toList.tail
          ))
        }

I take two random vertex identifiers:

val v1 = 4000000028222916L
val v2 = 4000000031019012L

and calculate the path between them:

val results = dijkstra(my_graph, v1).vertices.map(_._2).collect

, stackoverflow. , 3 4 . 10 igraph Python . ? , , ( stackoverflow), 30/40 .

+4
1

python-igraph github

" ( ), , ."

, 4000 apache-spark, , ( Spark PMC Kay Ousterhout.), , , , :

... , - ... , , . , ...

, , , .

0

Source: https://habr.com/ru/post/1666398/


All Articles