How to find out which counter is the fastest?

I studied query optimization in the latest releases of Spark SQL 2.3.0-SNAPSHOT and noticed different physical plans for semantically identical queries.

Suppose I need to count the number of rows in the following dataset:

val q = spark.range(1)

I could count the number of rows as follows:

  • q.count
  • q.collect.size
  • q.rdd.count
  • q.queryExecution.toRdd.count

My initial thought was that it was an almost constant operation (of course, due to the local data set), which was somehow optimized by Spark SQL and would immediately give a result, especially. 1st, where Spark SQL has full control over query execution.

Looking at the physical query plans, I thought that the most efficient query would be the last:

q.queryExecution.toRdd.count

Causes:

  • This avoids binary deserialization of strings. InternalRow
  • Request coordinated

.

Details for the job

? , , (, , JDBC, Kafka)?

: , , , , ( )?


.

q.count

q.count

q.collect.size

q.collect.size

q.rdd.count

q.rdd.count

+4
2

val q = spark.range(100000000):

  • q.count: ~ 50
  • q.collect.size: ...
  • q.rdd.count: ~ 1100
  • q.queryExecution.toRdd.count: ~ 600

:

1 , , . JVM (. https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html).

2. , .

3. 4, , .

4. .

+7

,

/*note: I'm using spark 1.5.2 so some of these might be different than what you might find in a newer version
 *note: These were all done using a 25 node , 40 core/node and started with --num-executors 64 --executor-cores 1 --executor-memory 4g
 *note: the displayed values are the mean from 10 runs
 *note: the spark-shell was restarted every time I noticed any spikes intra-run
 *
 *million/billion = sc.parallelize(1  to 1000000).toDF("col1")
 *
 *val s0 = sc.parallelize(1  to 1000000000)
 *//had to use this to get around maxInt constraints for Seq
 *billion10 = sc.union(s0,s1,s2,s3,s4,s5,s6,s7,s8,s9).toDF("col1")
 *
 *for parquet files
 *compression=uncompressed
 *written with:    million/billion/billion10.write.parquet
 *read with:    sqlContext.read.parquet
 *
 *for text files
 *written with:    million/billion/billion10.map(x=> x.mkString(",")).saveAsTextFile
 *read with:    sc.textFile.toDF("col1")
 *
 *excluded the collect() because that would have murdered my machine
 *made them all dataframes for consistency
/*


size       type     query         
billion10  text     count              81.594582
                    queryExecution     81.949047
                    rdd.count         119.710021
           Seq      count              18.768544
                    queryExecution     14.257751
                    rdd.count          36.404834
           parquet  count              12.016753
                    queryExecution     24.305452
                    rdd.count          41.932466
billion    text     count              14.120583
                    queryExecution     14.346528
                    rdd.count          22.240026
           Seq      count               2.191781
                    queryExecution      1.655651
                    rdd.count           2.831840
           parquet  count               2.004464
                    queryExecution      5.010546
                    rdd.count           7.815010
million    text     count               0.975095
                    queryExecution      0.113718
                    rdd.count           0.184904
           Seq      count               0.192044
                    queryExecution      0.029069
                    rdd.count           0.036061
           parquet  count               0.963874
                    queryExecution      0.217661
                    rdd.count           0.262279

:

  • Seq , .
  • , TEXT , , , , -,
  • count queryExecution , rdd.count ( )
  • count queryExecution :
    • queryExecution Seq
    • .
  • .

- , , , , , ,

+3

Source: https://habr.com/ru/post/1676638/


All Articles