I have an RDD (String,String,Int)
.
- I want to reduce it based on the first two lines
- And then based on the first line I want to group (String, Int) and sort them
- After sorting, I need to group them into small groups, each of which contains n elements.
I made the code below. The problem is that the number of elements in step 2 is very large for one key and reduceByKey(x++y)
takes a lot of time.
val data = Array(
("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1),
("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))
val rdd = sc.parallelize(data)
val r1 = rdd.map(x => ((x._1, x._2), (x._3)))
val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))
val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y)
val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList))
The problem is that "c1" has many unique entries, such as b1, b2 .... million, and reduceByKey
this is the kill time, because all values will be single node. Is there a way to achieve this more efficiently?
Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))