Which version of Apache Spark are you using out of curiosity? There were fixes in Apache Spark 2.0+ that included changes to approxQuantile .
If I ran the pySpark code snippet below:
rdd = sc.parallelize([[1, 0.0], [1, 0.0], [1, 1.0], [1, 1.0], [1, 1.0], [1, 1.0]]) df = rdd.toDF(['id', 'num']) df.createOrReplaceTempView("df")
with median calculation using approxQuantile as:
df.approxQuantile("num", [0.5], 0.25)
or
spark.sql("select percentile_approx(num, 0.5) from df").show()
results:
- Spark 2.0.0 : 0.25
- Spark 2.0.1 : 1.0
- Spark 2.1.0 : 1.0
Note that these are approximate numbers (via approxQuantile ), although overall this should work well. If you need an accurate median, one approach is to use numpy.median . Below is a snippet of code for this df example based on the SOCH answer from gench to How to find the median in Apache Spark with the Python Dataframe API? :
from pyspark.sql.types import * import pyspark.sql.functions as F import numpy as np def find_median(values): try: median = np.median(values)
with output:
+---+--------------------+------+ | id| nums|median| +---+--------------------+------+ | 1|[0.0, 0.0, 1.0, 1...| 1.0| +---+--------------------+------+
Update: Spark 1.6 Scala version using RDD
If you are using Spark 1.6, you can calculate the median using the Scala code through the answer of Eugene Zhulenev. How to calculate the exact median with Apache Spark . Below is a modified code that works with our example.
import org.apache.spark.SparkContext._ val rdd: RDD[Double] = sc.parallelize(Seq((0.0), (0.0), (1.0), (1.0), (1.0), (1.0))) val sorted = rdd.sortBy(identity).zipWithIndex().map { case (v, idx) => (idx, v) } val count = sorted.count() val median: Double = if (count % 2 == 0) { val l = count / 2 - 1 val r = l + 1 (sorted.lookup(l).head + sorted.lookup(r).head).toDouble / 2 } else sorted.lookup(count / 2).head.toDouble
with output:
// output import org.apache.spark.SparkContext._ rdd: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[227] at parallelize at <console>:34 sorted: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[234] at map at <console>:36 count: Long = 6 median: Double = 1.0
Note that this is an exact median calculation using RDDs - that is, you will need to convert the DataFrame column to RDD to perform this calculation.