Fixed bug caused by

In the spark shell, I performed the following work:

val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i)).persist d.join(d.reduceByKey(_ + _)).collect 

The Spark user interface shows three steps. Steps 4 and 5 correspond to the calculation of d , and step 6 corresponds to the calculation of the action collect . Since d persists, I expect only two steps. However, at step 5, there are those not associated with any other steps.

Spark UI DAG

So, I tried working with the same calculation without using persist, and the DAG looks the same, except without green dots indicating that the RDD is being saved.

Spark UI DAG without saving

I expect the output of step 11 to be connected to the input in step 12, but that is not the case.

If you look at the descriptions of the scene, the steps seem to indicate that d saved because there is input in step 5, but I'm still confused as to why step 5 even exists.

Spark Interface Steps

Sparks for UI scripts without saving

+5
source share
2 answers
  • RDD input is cached, and the cached part is not recounted.

    This can be checked with a simple test:

     import org.apache.spark.SparkContext def f(sc: SparkContext) = { val counter = sc.longAccumulator("counter") val rdd = sc.parallelize(0 until 100).map(i => { counter.add(1L) (i%10, i) }).persist rdd.join(rdd.reduceByKey(_ + _)).foreach(_ => ()) counter.value } assert(f(spark.sparkContext) == 100) 
  • Caching does not remove steps from the DAG.

    If the data is cached, the corresponding steps may be marked as skipped , but are still part of the DAG. Lineage can be truncated using breakpoints, but this is not the same thing, and it does not remove the rendering steps.

  • Input stages contain more cached calculations.

    Spark steps combine operations that can be chains without shuffling.

    While part of the input cache is cached, it does not cover all the operations necessary to prepare files in random order. That is why you do not see missed tasks.

  • The rest (separation) is only a limitation of the graph visualization.

  • If you retype the data first:

     import org.apache.spark.HashPartitioner val d = sc.parallelize(0 until 1000000) .map(i => (i%100000, i)) .partitionBy(new HashPartitioner(20)) d.join(d.reduceByKey(_ + _)).collect 

    you will get the DAG that you are most likely looking for:

    enter image description here

+1
source

By adding a detailed answer to user6910411, the RDD is not stored in memory until the first action begins, and calculates the entire DAG due to the lazy evaluation of the RDD. Therefore, when you run collect () for the first time, RDD "d" is first stored in memory, but nothing is read from memory. If you run the collect () command a second time, the cached RDD is read.

Also, if you do toDebugString on the final RDD, it shows the following result:

  scala> d.join(d.reduceByKey(_ + _)).toDebugString res5: String = (4) MapPartitionsRDD[19] at join at <console>:27 [] | MapPartitionsRDD[18] at join at <console>:27 [] | CoGroupedRDD[17] at join at <console>:27 [] +-(4) MapPartitionsRDD[15] at map at <console>:24 [] | | ParallelCollectionRDD[14] at parallelize at <console>:24 [] | ShuffledRDD[16] at reduceByKey at <console>:27 [] +-(4) MapPartitionsRDD[15] at map at <console>:24 [] | ParallelCollectionRDD[14] at parallelize at <console>:24 [] 

The rough graphic representation above can be shown as: RDD Stages

0
source

Source: https://habr.com/ru/post/1259845/


All Articles