This problem looks like a transitive closure of a graph, presented as a distributed list of edges.
Spark, Hadoop MR, , Spark . , , :
def closure(rdd:RDD[(Int, Int)]):RDD[(Int,Int)] = {
val transitiveValues = rdd.map(_.swap).join(rdd).filter{case (_,(x,y)) => x != y}
if (transitiveValues.isEmpty) {
rdd
} else {
val usedTransitions = transitiveValues.flatMap{case (a,(x,y)) => Seq((x,a),(a,y))}
val newTransitions = transitiveValues.map{case (a,(x,y)) => (x,y)}
closure(rdd.subtract(usedTransitions).union(newTransitions)).distinct
}
}
, ( ), closure((1, 2),(2, 7)) = (1,7) (1, 2), (1, 7), . . , ( ).
.