Medians / quantiles in the PySparkBy group

I would like to calculate group quantiles on a Spark data frame (using PySpark). Either an approximate or accurate result will be in order. I prefer a solution that I can use in the groupBy/ context aggso that I can mix it with other PySpark aggregate functions. If for some reason this is not possible, then another approach will do.

This question is related, but does not indicate how to use approxQuantileas an aggregate function.

I also have access to the UDF- percentile_approxPertile Percent_approx, but I don’t know how to use it as an aggregate function.

Suppose I have the following data frame:

from pyspark import SparkContext
import pyspark.sql.functions as f

sc = SparkContext()    

df = sc.parallelize([
    ['A', 1],
    ['A', 2],
    ['A', 3],
    ['B', 4],
    ['B', 5],
    ['B', 6],
]).toDF(('grp', 'val'))

df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val'))
df_grp.show()

Expected Result:

+----+-------+
| grp|med_val|
+----+-------+
|   A|      2|
|   B|      5|
+----+-------+
+12
3

, . ( , ).

from pyspark.sql import Window
import pyspark.sql.functions as F

grp_window = Window.partitionBy('grp')
magic_percentile = F.expr('percentile_approx(val, 0.5)')

df.withColumn('med_val', magic_percentile.over(grp_window))

, , :

df.groupBy('gpr').agg(magic_percentile.alias('med_val'))

:

quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')

.

+13

percentile_approx, SQL:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df.registerTempTable("df")
df2 = sqlContext.sql("select grp, percentile_approx(val, 0.5) as med_val from df group by grp")
+9

, , , , "" PySpark ( Shaido SQL), : , mean, approxQuantile Column, .

:

spark.version
# u'2.2.0'

import pyspark.sql.functions as func
from pyspark.sql import DataFrameStatFunctions as statFunc

# aggregate with mean works OK:
df_grp_mean = df.groupBy('grp').agg(func.mean(df['val']).alias('mean_val'))
df_grp_mean.show()
# +---+--------+ 
# |grp|mean_val|
# +---+--------+
# |  B|     5.0|
# |  A|     2.0|
# +---+--------+

# try aggregating by median:
df_grp_med = df.groupBy('grp').agg(statFunc(df).approxQuantile('val', [0.5], 0.1))
# AssertionError: all exprs should be Column

# mean aggregation is a Column, but median is a list:

type(func.mean(df['val']))
# pyspark.sql.column.Column

type(statFunc(df).approxQuantile('val', [0.5], 0.1))
# list

I doubt the window-based approach will make any difference, because, as I said, the underlying reason is very elementary.

See also my answer here for more details.

+7
source

Source: https://habr.com/ru/post/1687863/


All Articles