Replacing Apache Spark RDD

I am trying to solve a problem, so I have a dataset like this:

(1, 3)
(1, 4)
(1, 7)
(1, 2)   <-
(2, 7)   <-
(6, 6)    
(3, 7)   <-
(7, 4)   <-
...

Since (1 -> 2)and (2 -> 7), I would like to replace the set (2, 7)with (1, 7) similarly, (3 -> 7)and (7 -> 4)also replace (7,4)with(3, 4)

Therefore, my dataset becomes

(1, 3)
(1, 4)
(1, 7)
(1, 2)  
(1, 7)  
(6, 6)    
(3, 7)
(3, 4)
...

Any idea how to solve or solve this problem?

thank

0
source share
1 answer

This problem looks like a transitive closure of a graph, presented as a distributed list of edges.

Spark, Hadoop MR, , Spark . , , :

def closure(rdd:RDD[(Int, Int)]):RDD[(Int,Int)] = {
  val transitiveValues = rdd.map(_.swap).join(rdd).filter{case (_,(x,y)) => x != y}
  if (transitiveValues.isEmpty) {
    rdd
  } else {
    val usedTransitions = transitiveValues.flatMap{case (a,(x,y)) => Seq((x,a),(a,y))}
    val newTransitions = transitiveValues.map{case (a,(x,y)) => (x,y)}
    closure(rdd.subtract(usedTransitions).union(newTransitions)).distinct
  }
}

, ( ), closure((1, 2),(2, 7)) = (1,7) (1, 2), (1, 7), . . , ( ).

.

+2

Source: https://habr.com/ru/post/1661123/


All Articles