Join multiple rdds

I have 4rdds of type RDD: ((int, int, int), values), and my rdds

rdd1: ((a,b,c), value) 
rdd2:((a,d,e),valueA) 
rdd3:((f,b,g),valueB)
rdd4:((h,i,c),valueC) 

How can I join rdds, for example, rdd1 join rdd2 on "a" rdd1 join rdd2 on "b" and rdd1 join rdd3 on "c"

why exit finalRdd: ((a,b,c),valueA,valueB,valueC,value))to Scala?

I tried to do this with collectAsMap, but it did not work well and throws an exception

code for rdd1 only join rdd2

val newrdd2=rdd2.map{case( (a,b,c),d)=>(a,d)}.collectAsMap
val joined=rdd1.map{case( (a,b,c),d)=>(newrdd2.get(a).get,b,c,d)} 

Example

rdd1: ((1,2,3),animals)
rdd2:((1,anyInt,anyInt),cat)
rdd3:((anyInt,2,anyInt),cow )
rdd 4: ((anyInt,anyInt,3),parrot)

the conclusion should be ((1,2,3),animals,cat,cow,parrot )

+4
source share
1 answer

RDD has a convenient method join, but you need it to be turnkey with your specific connection key, which is what Spark uses for splitting and shuffling.

:

join (otherDataset, [numTasks]): (K, V) (K, W) (K, (V, W)) . leftOuterJoin, rightOuterJoin fullOuterJoin.

, , :

val rdd1KeyA = rdd1.map(x => (x._1._1, (x._1._2, x._1._3. x._2) // RDD(a, (b,c,value))
val rdd2KeyA = rdd2.map(x => (x._1._1, x._2) // RDD(a, valueA)
val joined1 = rdd1KeyA.join(rdd2KeyA) // RDD(a, ((b,c,value), valueA))

val rdd3KeyB = rdd3.map(x => (x._1._2, x._2) // RDD(b, valueB)
val joined1KeyB = joined1.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._1._3. x._2._2) // RDD(b, (a, c, value, valueA))
val joined2 = joined1KeyB.join(rdd3keyB) // RDD(b, ((a, c, value, valueA), valueB))

... ..

collect* , , RDD node, , .

+1

Source: https://habr.com/ru/post/1620748/


All Articles