, , . :
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("_1").rowsBetween(Long.MinValue, Long.MaxValue)
:
import org.apache.spark.sql.functions.sum
val p = $"_2" / sum($"_2").over(w)
val withP = df.withColumn("p", p)
, , :
import org.apache.spark.sql.functions.log2
withP.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
val df = Seq(
(0, 13), (0, 7), (0, 3), (0, 1), (0, 1), (1, 4), (1, 8), (1, 18), (1, 4)).toDF
:
+---+------------------+
| _1| entropy|
+---+------------------+
| 1|1.7033848993102918|
| 0|1.7433726580786888|
+---+------------------+
, --:
df.groupBy($"_1").agg(sum("_2").alias("total"))
.join(df, Seq("_1"), "inner")
.withColumn("p", $"_2" / $"total")
.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
:
df.groupBy($"_1").agg(sum("_2").alias("total"))
_2 _1,
_.join(df, Seq("_1"), "inner")
,
_.withColumn("p", $"_2" / $"total")
:
_.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
.