How to find the exact median for grouped data in Spark

I have a requirement for calculating the exact median group of grouped data of a double data type in Spark using Scala.

It differs from a similar query: Find the median in SQL sparks for multiple columns of a double data type . This question is about search data for grouped data, while another is about median at the RDD level.

Here is my sample data

scala> sqlContext.sql("select * from test").show() +---+---+ | id|num| +---+---+ | A|0.0| | A|1.0| | A|1.0| | A|1.0| | A|0.0| | A|1.0| | B|0.0| | B|1.0| | B|1.0| +---+---+ 

Expected Answer:

 +--------+ | Median | +--------+ | 1 | | 1 | +--------+ 

I tried the following option, but no luck:

1) Function percentile percentile, it worked only for BigInt.

2) The hive percentile_approx function, but it does not work properly (returns 0.25 vs 1).

 scala> sqlContext.sql("select percentile_approx(num, 0.5) from test group by id").show() +----+ | _c0| +----+ |0.25| |0.25| +----+ 
+2
source share
6 answers

Simplest approach (requires Spark 2.0.1+, not an exact median)

As noted in the comments on the first question Find the median in Spark SQL for double column columns , we can use percentile_approx to calculate the median for Spark 2.0.1+. To apply this to grouped data in Apache Spark, the query would look like this:

 val df = Seq(("A", 0.0), ("A", 0.0), ("A", 1.0), ("A", 1.0), ("A", 1.0), ("A", 1.0), ("B", 0.0), ("B", 1.0), ("B", 1.0)).toDF("id", "num") df.createOrReplaceTempView("df") spark.sql("select id, percentile_approx(num, 0.5) as median from df group by id order by id").show() 

with output:

 +---+------+ | id|median| +---+------+ | A| 1.0| | B| 1.0| +---+------+ 

Saying this, this is an approximate value (as opposed to the exact median in the question).

Calculate the exact median for grouped data

There are several approaches, so I am sure that others in SO can provide better or more efficient examples. But here, the code snippet calculates the median for the grouped data in Spark (tested in Spark 1.6 and Spark 2.1):

 import org.apache.spark.SparkContext._ val rdd: RDD[(String, Double)] = sc.parallelize(Seq(("A", 1.0), ("A", 0.0), ("A", 1.0), ("A", 1.0), ("A", 0.0), ("A", 1.0), ("B", 0.0), ("B", 1.0), ("B", 1.0))) // Scala median function def median(inputList: List[Double]): Double = { val count = inputList.size if (count % 2 == 0) { val l = count / 2 - 1 val r = l + 1 (inputList(l) + inputList(r)).toDouble / 2 } else inputList(count / 2).toDouble } // Sort the values val setRDD = rdd.groupByKey() val sortedListRDD = setRDD.mapValues(_.toList.sorted) // Output DataFrame of id and median sortedListRDD.map(m => { (m._1, median(m._2)) }).toDF("id", "median_of_num").show() 

with output:

 +---+-------------+ | id|median_of_num| +---+-------------+ | A| 1.0| | B| 1.0| +---+-------------+ 

There are some caveats that I should name, as this is probably not the most efficient implementation:

  • Currently using groupByKey , which is not very efficient. Instead, you can change this to reduceByKey (more information on Avoid GroupByKey )
  • Using the Scala function to calculate median .

This approach should work fine for smaller amounts of data, but if you have millions of rows for each key, it is recommended that you use Spark 2.0.1+ and use the percentile_approx approach.

+3
source

Here is my version of the PERCENTILE_COUNT function in SPARK. This can be used to find the median value for grouped data in a Dataframe. Hope this can help someone. Feel free to suggest your suggestions to improve the solution.

 val PERCENTILEFLOOR = udf((maxrank: Integer, percentile: Double) => scala.math.floor(1 + (percentile * (maxrank - 1)))) val PERCENTILECEIL = udf((maxrank: Integer, percentile: Double) => scala.math.ceil(1 + (percentile * (maxrank - 1)))) val PERCENTILECALC = udf((maxrank: Integer, percentile: Double, floorVal: Double, ceilVal: Double, floorNum: Double, ceilNum: Double) => { if (ceilNum == floorNum) { floorVal } else { val RN = (1 + (percentile * (maxrank - 1))) ((ceilNum - RN) * floorVal) + ((RN - floorNum) * ceilVal) } }) /** * The result of PERCENTILE_CONT is computed by linear interpolation between values after ordering them. * Using the percentile value (P) and the number of rows (N) in the aggregation group, * we compute the row number we are interested in after ordering the rows with respect to the sort specification. * This row number (RN) is computed according to the formula RN = (1+ (P*(N-1)). * The final result of the aggregate function is computed by linear interpolation between the values from rows at row numbers * CRN = CEILING(RN) and FRN = FLOOR(RN). * * The final result will be: * * If (CRN = FRN = RN) then the result is * (value of expression from row at RN) * Otherwise the result is * (CRN - RN) * (value of expression for row at FRN) + * (RN - FRN) * (value of expression for row at CRN) * * Parameter details * * @inputDF - Dataframe for computation * @medianCol - Column for which percentile to be calculated * @grouplist - Group list for dataframe before sorting * @percentile - numeric value between 0 and 1 to express the percentile to be calculated * */ def percentile_count(inputDF: DataFrame, medianCol: String, groupList: List[String], percentile: Double): DataFrame = { val orderList = List(medianCol) val wSpec3 = Window.partitionBy(groupList.head, groupList.tail: _*).orderBy(orderList.head, orderList.tail: _*) // Group, sort and rank the DF val rankedDF = inputDF.withColumn("rank", row_number().over(wSpec3)) // Find the maximum for each group val groupedMaxDF = rankedDF.groupBy(groupList.head, groupList.tail: _*).agg(max("rank").as("maxval")) // CRN calculation val ceilNumDF = groupedMaxDF.withColumn("rank", PERCENTILECEIL(groupedMaxDF("maxval"), lit(percentile))).drop("maxval") // FRN calculation val floorNumDF = groupedMaxDF.withColumn("rank", PERCENTILEFLOOR(groupedMaxDF("maxval"), lit(percentile))) val ntileGroup = "rank" :: groupList //Get the values for the CRN and FRN val floorDF = floorNumDF.join(rankedDF, ntileGroup).withColumnRenamed("rank", "floorNum").withColumnRenamed(medianCol, "floorVal") val ceilDF = ceilNumDF.join(rankedDF, ntileGroup).withColumnRenamed("rank", "ceilNum").withColumnRenamed(medianCol, "ceilVal") //Get both the values for CRN and FRN in same row val resultDF = floorDF.join(ceilDF, groupList) val finalList = "median_" + medianCol :: groupList // Calculate the median using the UDF PERCENTILECALC and returns the DF resultDF.withColumn("median_" + medianCol, PERCENTILECALC(resultDF("maxval"), lit(percentile), resultDF("floorVal"), resultDF("ceilVal"), resultDF("floorNum"), resultDF("ceilNum"))).select(finalList.head, finalList.tail: _*) } 
0
source

You can try this solution for the exact median. I have described sql gist.github solution here. To calculate the exact median, I use the row_number () and count () functions in combination with the window function.

 val data1 = Array( ("a", 0), ("a", 1), ("a", 1), ("a", 1), ("a", 0), ("a", 1)) val data2 = Array( ("b", 0), ("b", 1), ("b", 1)) val union = data1.union(data2) val df = sc.parallelize(union).toDF("key", "val") df.cache.createOrReplaceTempView("kvTable") spark.sql("SET spark.sql.shuffle.partitions=2") var ds = spark.sql(""" SELECT key, avg(val) as median FROM ( SELECT key, val, rN, (CASE WHEN cN % 2 = 0 then (cN DIV 2) ELSE (cN DIV 2) + 1 end) as m1, (cN DIV 2) + 1 as m2 FROM ( SELECT key, val, row_number() OVER (PARTITION BY key ORDER BY val ) as rN, count(val) OVER (PARTITION BY key ) as cN FROM kvTable ) s ) r WHERE rN BETWEEN m1 and m2 GROUP BY key """) 

Spark executes and optimizes this query efficiently because it reuses data sharing.

 scala> ds.show +---+------+ |key|median| +---+------+ | a| 1.0| | b| 1.0| +---+------+ 
0
source

with the high-order element_at function added in Spark 2.4. We can use the Window or groupBy function and then join back.

Sample data

 import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ case class Salary(depName: String, empNo: Long, salary: Long) val empsalary = Seq( Salary("sales", 1, 5000), Salary("personnel", 2, 3900), Salary("sales", 3, 4800), Salary("sales", 4, 4800), Salary("personnel", 5, 3500), Salary("develop", 7, 4200), Salary("develop", 8, 6000), Salary("develop", 9, 4500), Salary("develop", 10, 5200), Salary("develop", 11, 5200)).toDS 

with window function

 val byDepName = Window.partitionBy('depName).orderBy('salary) val df = empsalary.withColumn( "salaries", collect_list('salary) over byDepName).withColumn( "median_salary", element_at('salaries, (size('salaries)/2 + 1).cast("int"))) df.show(false) 

with groupBy then join back

 val dfMedian = empsalary.groupBy("depName").agg( sort_array(collect_list('salary)).as("salaries")).select( 'depName, element_at('salaries, (size('salaries)/2 + 1).cast("int")).as("median_salary")) empsalary.join(dfMedian, "depName").show(false) 
0
source

If you do not want to use spark-sql (like me), you can use the cume_dist function.

See an example below:

 import org.apache.spark.sql.{functions => F} import org.apache.spark.sql.expressions.Window val df = (1 to 10).toSeq.toDF val win = Window. partitionBy(F.col("value")). orderBy(F.col("value")). rangeBetween(Window.unboundedPreceding, Window.currentRow) df.withColumn("c", F.cume_dist().over(win)).show 

Results:

 +-----+---+ |value| c| +-----+---+ | 1|0.1| | 2|0.2| | 3|0.3| | 4|0.4| | 5|0.5| | 6|0.6| | 7|0.7| | 8|0.8| | 9|0.9| | 10|1.0| +-----+---+ 

Median is the value for which df("c") is 0.5. Hope this helps, Elior.

0
source

Just to add Elior to the answer and answer Erkan, the reason the output is 1.0 for each column is because partitionBy (F.col ("value")) splits the data as one row per section, so that when the window calculates cume_dist it does this for a single value and results in 1.0.

Removing partitionBy (F.col ("value")) from the window operation results in the expected quantiles.


The beginning of Elior's answer


If you do not want to use spark-sql (like me), you can use the cume_dist function. See an example below:

 import org.apache.spark.sql.{functions => F} import org.apache.spark.sql.expressions.Window val df = (1 to 10).toSeq.toDF val win = Window. partitionBy(F.col("value")). //Remove this line orderBy(F.col("value")). rangeBetween(Window.unboundedPreceding, Window.currentRow) df.withColumn("c", F.cume_dist().over(win)).show 

Results:

 +-----+---+ |value| c| +-----+---+ | 1|0.1| | 2|0.2| | 3|0.3| | 4|0.4| | 5|0.5| | 6|0.6| | 7|0.7| | 8|0.8| | 9|0.9| | 10|1.0| +-----+---+ 

Median is the value for which df("c") is 0.5. Hope this helps, Elior.


The End of Elior's Answer


A window is defined without partitionBy:

 val win = Window. orderBy(F.col("value")). rangeBetween(Window.unboundedPreceding, Window.currentRow) df.withColumn("c", F.cume_dist().over(win)).show 
0
source

Source: https://habr.com/ru/post/1262107/


All Articles