Compute PySpark DataFrame column mode?

Ultimately, I want column mode for all columns in a DataFrame. For other summary statistics, I see several options: use DataFrame aggregation or map DataFrame columns to RDD vectors (something that also occurs for me) and use colStatsfrom MLlib. But I do not see the mode as an option.

+6
source share
4 answers

The problem with the mode is almost the same as with the environment. Although it is easy to calculate, the calculation is quite expensive. This can be done either using sorting, followed by local and global aggregations, or using only one word-word and filter:

import numpy as np
np.random.seed(1)

df = sc.parallelize([
    (int(x), ) for x in np.random.randint(50, size=10000)
]).toDF(["x"])

cnts = df.groupBy("x").count()
mode = cnts.join(
    cnts.agg(max("count").alias("max_")), col("count") == col("max_")
).limit(1).select("x")
mode.first()[0]
## 0

In either case, a complete shuffle may be required for each column.

+7
source

You can calculate the column mode using Java code as follows:

            case MODE:
                Dataset<Row> cnts = ds.groupBy(column).count();
                Dataset<Row> dsMode = cnts.join(
                        cnts.agg(functions.max("count").alias("max_")),
                        functions.col("count").equalTo(functions.col("max_")
                        ));
                Dataset<Row> mode = dsMode.limit(1).select(column);
                replaceValue = ((GenericRowWithSchema) mode.first()).values()[0];
                ds = replaceWithValue(ds, column, replaceValue);
                break;

private static Dataset<Row> replaceWithValue(Dataset<Row> ds, String column, Object replaceValue) {
    return ds.withColumn(column,
            functions.coalesce(functions.col(column), functions.lit(replaceValue)));
}
0
source

>>> df=newdata.groupBy('columnName').count()
>>> mode = df.orderBy(df['count'].desc()).collect()[0][0]

See My result

>>> newdata.groupBy('var210').count().show()
+------+-----+
|var210|count|
+------+-----+
|  3av_|   64|
|  7A3j|  509|
|  g5HH| 1489|
|  oT7d|  109|
|  DM_V|  149|
|  uKAI|44883|
+------+-----+

# store the above result in df
>>> df=newdata.groupBy('var210').count()
>>> df.orderBy(df['count'].desc()).collect()
[Row(var210='uKAI', count=44883),
Row(var210='g5HH', count=1489),
Row(var210='7A3j', count=509),
Row(var210='DM_V', count=149),
Row(var210='oT7d', count=109),
Row(var210='3av_', count=64)]

# get the first value using collect()
>>> mode = df.orderBy(df['count'].desc()).collect()[0][0]
>>> mode
'uKAI'

groupBy(), . df - var210, count. orderBy() count 1- . collect() [0] [0] 1

0

"col" spark df:

df.groupby("col"). count(). orderBy ("count", ascending = False).first() [0]

df :

[df.groupby(i).count(). orderBy ("count", ascending = False).first() [0] i df.columns]

, , , 2D-:

[[i, df.groupby(i).count(). orderBy ("count", ascending = False).first() [0]] i df.columns]

0

Source: https://habr.com/ru/post/1622877/


All Articles