How to use ReduceByKey on multiple keys in Scala Spark Job

I'm relatively new, and I'm trying to combine data with multiple keys at the same time.

I have some data that I display, so it looks like this:

((K1, K2, K3), (V1, V2))

My goal is to group (K1, K2, K3) and summarize V1 and V2 accordingly, so that in the end:

((K1, K2, K3), (SUM (V1), SUM (V2))

Here is the code that I still have:

val filepath  = "file.avro"
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)            
val data = sqlContext.read.avro(filepath)
val dataRDD = data.rdd

val mappedDataRDD = dataRDD.map{
   case (v, w, x, y, z) => ((v,w,x), (y, z))
}.reduceByKey((x,y)=> ???)

So, I'm looking for how to reduce ByKey so that I can group keys (v, w, x) and summarize y and z.

+4
source share
2 answers

I think what you are looking for and should use aggregateByKey.

. . :

  • , .
  • , .

:

val (accZeroY, accZeroZ): (Long, Long) = (0, 0) 

val mappedDataRDD = dataRDD
  .map({
    case (v, w, x, y, z) => ((v,w,x), (y, z))
  })
  .aggregateByKey((accZeroY, accZeroZ))(
    { case ((accY, accZ), (y, z)) =>  (accY + y, accZ + z) }
    { case ((accY1, accZ1), (accY2, accZ2)) => (accY1 + accY2, accZ1 + accZ2) }
  )

, . , type of the needed accumulation key-value-RDD PairRDD.

reduceByKey, aggregateByKey , ,

val mappedDataRDD = dataRDD
  .map({
    case (v, w, x, y, z) => ((v,w,x), (y, z))
  })
  .reduceByKey(
    { case ((accY, accZ), (y, z)) =>  (accY + y, accZ + z) }
  )

, , should NOT reduceByKey. , aggregateByKey, , - , .

, , (x, y) (Int, Int), (v, w, x) . , Int ... , , Int.

... , - , (Int, Int), (Long, Long) reduceByKey . ... , , , aggregateByKey

+4

reduceByKey, , . , , .

val rdd = sc.parallelize(List(
  (1, 2, 1, 1, 1), 
  (1, 2, 1, 2, 2),   
  (1, 3, 2, 4, 4)))

rdd.map {
  case (k1, k2, k3, v1, v2) => ((k1, k2, k3), (v1, v2))
}.reduceByKey {
  // You receive two values which are actually tuples, so we treat them like that.
  case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)
}.collect()
//res0: Array[((Int, Int), (Int, Int))] = Array(((1,2,1),(3,3)), ((1,3,2),(4,4)))
+1

Source: https://habr.com/ru/post/1655986/


All Articles