Splitting RDDs and aggregating on new RDDs

I have an RDD (String,String,Int).

  • I want to reduce it based on the first two lines
  • And then based on the first line I want to group (String, Int) and sort them
  • After sorting, I need to group them into small groups, each of which contains n elements.

I made the code below. The problem is that the number of elements in step 2 is very large for one key and reduceByKey(x++y)takes a lot of time.

//Input
val data = Array(
  ("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1), 
  ("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))

val rdd = sc.parallelize(data)
val r1 = rdd.map(x => ((x._1, x._2), (x._3)))
val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))

// This is taking long time.
val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y) 

// from the list I will be doing grouping.
val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList)) 

The problem is that "c1" has many unique entries, such as b1, b2 .... million, and reduceByKeythis is the kill time, because all values ​​will be single node. Is there a way to achieve this more efficiently?

// output
 Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))
+2
source share
1

, .

 mapValues(x => ArrayBuffer(x))

, , reduceByKey

reduceByKey((x, y) => x ++ y) 

++ , . reduceByKey , GC hell.

?

, , mapValues + reduceByKey groupByKey:

val r3 = r2.groupByKey

reduceByKey mapPartitions preservesPartitioning map.

class FirsElementPartitioner(partitions: Int)
    extends org.apache.spark.Partitioner {
  def numPartitions  = partitions
  def getPartition(key: Any): Int = {
    key.asInstanceOf[(Any, Any)]._1.## % numPartitions
  }
}

val r2 = r1
  .reduceByKey(new FirsElementPartitioner(8), (x, y) => x + y)
  .mapPartitions(iter => iter.map(x => ((x._1._1), (x._1._2, x._2))), true)

// No shuffle required here.
val r3 = r2.groupByKey

, groupByKey - :

r3.toDebugString
// (8) MapPartitionsRDD[41] at groupByKey at <console>:37 []
//  |  MapPartitionsRDD[40] at mapPartitions at <console>:35 []
//  |  ShuffledRDD[39] at reduceByKey at <console>:34 []
//  +-(8) MapPartitionsRDD[1] at map at <console>:28 []
//     |  ParallelCollectionRDD[0] at parallelize at <console>:26 []
+2

Source: https://habr.com/ru/post/1686032/


All Articles