Simplest approach (requires Spark 2.0.1+, not an exact median)
As noted in the comments on the first question Find the median in Spark SQL for double column columns , we can use percentile_approx to calculate the median for Spark 2.0.1+. To apply this to grouped data in Apache Spark, the query would look like this:
val df = Seq(("A", 0.0), ("A", 0.0), ("A", 1.0), ("A", 1.0), ("A", 1.0), ("A", 1.0), ("B", 0.0), ("B", 1.0), ("B", 1.0)).toDF("id", "num") df.createOrReplaceTempView("df") spark.sql("select id, percentile_approx(num, 0.5) as median from df group by id order by id").show()
with output:
+---+------+ | id|median| +---+------+ | A| 1.0| | B| 1.0| +---+------+
Saying this, this is an approximate value (as opposed to the exact median in the question).
Calculate the exact median for grouped data
There are several approaches, so I am sure that others in SO can provide better or more efficient examples. But here, the code snippet calculates the median for the grouped data in Spark (tested in Spark 1.6 and Spark 2.1):
import org.apache.spark.SparkContext._ val rdd: RDD[(String, Double)] = sc.parallelize(Seq(("A", 1.0), ("A", 0.0), ("A", 1.0), ("A", 1.0), ("A", 0.0), ("A", 1.0), ("B", 0.0), ("B", 1.0), ("B", 1.0))) // Scala median function def median(inputList: List[Double]): Double = { val count = inputList.size if (count % 2 == 0) { val l = count / 2 - 1 val r = l + 1 (inputList(l) + inputList(r)).toDouble / 2 } else inputList(count / 2).toDouble } // Sort the values val setRDD = rdd.groupByKey() val sortedListRDD = setRDD.mapValues(_.toList.sorted) // Output DataFrame of id and median sortedListRDD.map(m => { (m._1, median(m._2)) }).toDF("id", "median_of_num").show()
with output:
+---+-------------+ | id|median_of_num| +---+-------------+ | A| 1.0| | B| 1.0| +---+-------------+
There are some caveats that I should name, as this is probably not the most efficient implementation:
- Currently using
groupByKey , which is not very efficient. Instead, you can change this to reduceByKey (more information on Avoid GroupByKey ) - Using the Scala function to calculate
median .
This approach should work fine for smaller amounts of data, but if you have millions of rows for each key, it is recommended that you use Spark 2.0.1+ and use the percentile_approx approach.