Count UDF calls in Spark

Using Spark 1.6.1 I want to name the number of times that UDF is called. I want to do this because I have a very expensive UDF (~ 1 second per call) and I suspect that UDF is called more often than the number of records in my data framework, which makes my spark work slower than necessary.

Although I could not reproduce this situation, I came up with a simple example showing that the number of calls in UDF seems different (here: less) than the number of lines, how can this be?

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf

object Demo extends App {
  val conf = new SparkConf().setMaster("local[4]").setAppName("Demo")
  val sc = new SparkContext(conf)
  sc.setLogLevel("WARN")
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._


  val callCounter = sc.accumulator(0)

  val df= sc.parallelize(1 to 10000,numSlices = 100).toDF("value")

  println(df.count) //  gives 10000

  val myudf = udf((d:Int) => {callCounter.add(1);d})

  val res = df.withColumn("result",myudf($"value")).cache

  println(res.select($"result").collect().size) // gives 10000
  println(callCounter.value) // gives 9941

}

If using a battery is not suitable for calling UDF counters, how else can I do this?

Note. In my actual Spark-Job, get a call counter that is about 1.7 times the actual number of records.

+4
1

Spark main() scala.App. scala.App .

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf

object Demo extends App {
    def main(args: Array[String]): Unit = {
         val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]")
         val sc = new SparkContext(conf)
         // [...]
    }   
}

.

+1

Source: https://habr.com/ru/post/1659246/


All Articles