How does Spark manage stages?

I am trying to understand how tasks and steps are defined in sparks, and for this I am now using the code I found here and the spark interface. To see it on the spark UI, I had to copy and paste the text into the files several times, so it takes more time to process.

Here is the result of the spark interface:

Jobs

Now I understand that there are three tasks, because there are three actions, and also that the steps are generated due to random actions, but I don’t understand why the steps 1, 4, 5 and 6 are the same as steps 0, 1, and 2 of task 0, and the same thing happens for task 2.

How can I find out which steps will be performed more than the task, only using Java code (before doing anything)? And also, why are stages 4 and 9 missing, and how can I know that this will happen before execution?

+5
source share
1 answer

I understand that there are three tasks, because there are three actions

I would even say that there could be more Spark jobs, but the minimum number is 3 . It all depends on the implementation of the transformations and the action used.

I don’t understand why at stages Job 1 4, 5 and 6 coincide with stages 0, 1 and 2 of task 0, and the same thing happens for Job 2.

Task 1 is the result of an action that was performed on RDD, finalRdd . This RDD was created using (backward): join , textFile , map and distinct .

 val people = sc.textFile("people.csv").map { line => val tokens = line.split(",") val key = tokens(2) (key, (tokens(0), tokens(1))) }.distinct val cities = sc.textFile("cities.csv").map { line => val tokens = line.split(",") (tokens(0), tokens(1)) } val finalRdd = people.join(cities) 

Run the above and you will see the same DAG.

Work 6

Now when you perform the leftOuterJoin or rightOuterJoin , you will get two other DAG groups. You use the previously used RDDs to start new Spark jobs, and you will see the same steps.

why are steps 4 and 9 skipped

Often Spark skips some steps. Segmented steps are those that have already been calculated, so Spark will reuse them and thus increase productivity.

Work 7

How can I find out which steps will be performed more than the task, only using Java code (before doing anything)?

What the line (graph) RDD offers.

 scala> people.leftOuterJoin(cities).toDebugString res15: String = (3) MapPartitionsRDD[99] at leftOuterJoin at <console>:28 [] | MapPartitionsRDD[98] at leftOuterJoin at <console>:28 [] | CoGroupedRDD[97] at leftOuterJoin at <console>:28 [] +-(2) MapPartitionsRDD[81] at distinct at <console>:27 [] | | ShuffledRDD[80] at distinct at <console>:27 [] | +-(2) MapPartitionsRDD[79] at distinct at <console>:27 [] | | MapPartitionsRDD[78] at map at <console>:24 [] | | people.csv MapPartitionsRDD[77] at textFile at <console>:24 [] | | people.csv HadoopRDD[76] at textFile at <console>:24 [] +-(3) MapPartitionsRDD[84] at map at <console>:29 [] | cities.csv MapPartitionsRDD[83] at textFile at <console>:29 [] | cities.csv HadoopRDD[82] at textFile at <console>:29 [] 

As you can see for yourself, you will end up with 4 stages, since there are 3 dependencies in random order (edges with section numbers).

The number in parentheses is the number of sections that DAGScheduler will ultimately use to create task sets with the exact number of tasks. One TaskSet per stage.

+7
source

Source: https://habr.com/ru/post/1262122/


All Articles