Spark translates sql window function to RDD for better performance

The function must be executed for several columns in the data frame.

def handleBias(df: DataFrame, colName: String, target: String = target) = {
    val w1 = Window.partitionBy(colName)
    val w2 = Window.partitionBy(colName, target)

    df.withColumn("cnt_group", count("*").over(w2))
      .withColumn("pre2_" + colName, mean(target).over(w1))
      .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
      .drop("cnt_group")
  }

This can be written beautifully, as shown above, in spark-SQL and the for loop. However, this causes a lot of shuffling (the spark applies the function to the columns in parallel ).

Minimal example:

  val df = Seq(
    (0, "A", "B", "C", "D"),
    (1, "A", "B", "C", "D"),
    (0, "d", "a", "jkl", "d"),
    (0, "d", "g", "C", "D"),
    (1, "A", "d", "t", "k"),
    (1, "d", "c", "C", "D"),
    (1, "c", "B", "C", "D")
  ).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")

  val columnsToDrop = Seq("col3TooMany")
  val columnsToCode = Seq("col1", "col2")
  val target = "TARGET"

  val targetCounts = df.filter(df(target) === 1).groupBy(target)
    .agg(count(target).as("cnt_foo_eq_1"))
  val newDF = df.join(broadcast(targetCounts), Seq(target), "left")

  val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(newDF) {
    (currentDF, colName) => handleBias(currentDF, colName)
  }

  result.drop(columnsToDrop: _*).show

How can I articulate this more efficiently using the RDD API? aggregateByKeyshould be a good idea, but I still don’t really understand how to apply it here to replace window functions.

(provides a bit more context / more example https://github.com/geoHeil/sparkContrastCoding )

Edit

DAG Spark DAG, . , /. , ( 300 ) " " .

handleBiasOriginal("col1", df)
    .join(handleBiasOriginal("col2", df), df.columns)
    .join(handleBiasOriginal("col3TooMany", df), df.columns)
    .drop(columnsToDrop: _*).show

  def handleBiasOriginal(col: String, df: DataFrame, target: String = target): DataFrame = {
    val pre1_1 = df
      .filter(df(target) === 1)
      .groupBy(col, target)
      .agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
      .drop(target)

    val pre2_1 = df
      .groupBy(col)
      .agg(mean(target).alias("pre2_" + col))

    df
      .join(pre1_1, Seq(col), "left")
      .join(pre2_1, Seq(col), "left")
      .na.fill(0)
  }

2.1.0, DAG Spark DAG 2.0.2 toocomplexDAG

DAG   df.cache   handleBiasOriginal ( "col1", df)....

, , SQL? , SQL .

caching

+4
2

, . , , .

, target ({0, 1}), , , StringType. , , , . , .

API RDD

  • :

    import org.apache.spark.sql.functions._
    
    val exploded = explode(array(
      (columnsToDrop ++ columnsToCode).map(c => 
        struct(lit(c).alias("k"), col(c).alias("v"))): _*
    )).alias("level")
    
    val long = df.select(exploded, $"TARGET")
    
  • aggregateByKey, :

    import org.apache.spark.util.StatCounter
    
    val lookup = long.as[((String, String), Int)].rdd
      // You can use prefix partitioner (one that depends only on _._1)
      // to avoid reshuffling for groupByKey
      .aggregateByKey(StatCounter())(_ merge _, _ merge _)
      .map { case ((c, v), s) => (c, (v, s)) }
      .groupByKey
      .mapValues(_.toMap)
      .collectAsMap
    
  • lookup . :

    lookup("col1")("A")
    
    org.apache.spark.util.StatCounter = 
      (count: 3, mean: 0.666667, stdev: 0.471405, max: 1.000000, min: 0.000000)
    

    col1, A. target, ( count/fraction ).

    , SQL udf .

API DataFrame

  • , RDD API.
  • :

    val stats = long
      .groupBy($"level.k", $"level.v")
      .agg(mean($"TARGET"), sum($"TARGET"))
    
  • , RDD.

+1

aggregateByKey aggregateByKey . : , .

- (, aggregateByKey, ). , ( aggregateByKey , , , ). , ( ), RDD,

, . , SQL, SQL ( , , ..).

SQL

SQL. , . ? groupBy ( , , , ).

0

Source: https://habr.com/ru/post/1665621/


All Articles