OK, I got this idea from the above examples here and here . I gave below a simple example that I wrote.
val x = Seq(("a", 36), ("b", 33), ("c", 40), ("a", 38), ("c", 39)).toDS
x: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]
val g = x.groupByKey(_._1)
g: org.apache.spark.sql.KeyValueGroupedDataset[String,(String, Int)] = ...
val z = g.mapGroups{case(k, iter) => (k, iter.map(x => x._2).toArray)}
z: org.apache.spark.sql.Dataset[(String, Array[Int])] = [_1: string, _2: array<int>]
z.show
+---+--------+
| _1| _2|
+---+--------+
| c|[40, 39]|
| b| [33]|
| a|[36, 38]|
+---+--------+
source
share