I have an RDD (String,String,Int).
- I want to reduce it based on the first two lines
- And then based on the first line I want to group (String, Int) and sort them
- After sorting, I need to group them into small groups, each of which contains n elements.
I made the code below. The problem is that the number of elements in step 2 is very large for one key and reduceByKey(x++y)takes a lot of time.
val data = Array(
("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1),
("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))
val rdd = sc.parallelize(data)
val r1 = rdd.map(x => ((x._1, x._2), (x._3)))
val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))
val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y)
val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList))
The problem is that "c1" has many unique entries, such as b1, b2 .... million, and reduceByKeythis is the kill time, because all values will be single node. Is there a way to achieve this more efficiently?
Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))