This is my simple test code.
val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6))) val groupby_RDD = test_RDD.groupByKey() val result_RDD = groupby_RDD.map{v => var result_list:List[Int] = Nil for (i <- v._2) { result_list ::= i } (v._1, result_list) }
Result below
result_RDD.take(3) >> res86: Array[(String, List[Int])] = Array((A,List(1, 3, 2)), (B,List(5, 4)), (C,List(6)))
Or you can do it like this:
val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6))) val nil_list:List[Int] = Nil val result2 = test_RDD.aggregateByKey(nil_list)( (acc, value) => value :: acc, (acc1, acc2) => acc1 ::: acc2 )
The result is
result2.take(3) >> res209: Array[(String, List[Int])] = Array((A,List(3, 2, 1)), (B,List(5, 4)), (C,List(6)))