Apache Spark: why does reduceByKey transform do a DAG?

I got a strange problem. As I understand it, DAG operations in Spark are only performed when an action is performed. However, I see that the reduceByKey () operation (being a conversion) starts to execute the DAG.

Steps to play. Try to describe a piece of code

SparkConf conf =new SparkConf().setMaster("local").setAppName("Test");
JavaSparkContext context=new JavaSparkContext(conf);

JavaRDD<String> textFile = context.textFile("any non-existing path"); // This path should not exist

JavaRDD<String> flatMap = textFile.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(x -> new Tuple2<String, Integer>((String) x, 1));

Note: the file path should not be any existing path. In other words, the file must not exist.

If you execute this code, nothing happens as expected. However, if you add the following line to the program and run

mapToPair.reduceByKey((x, y) -> x + y);

This gives the following exception:

Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:

This means that he began to perform DAG. Since reduceByKey () is a transformation, this should not happen until an action such as collect () or take () is performed.

: 2.0.0. , .

+4
1

, DAG ( : ).

, , reduceByKey Partitioner . , Spark . "Default partiionner" :

/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
* If any of the RDDs already has a partitioner, choose that one.
*
* Otherwise, we use a default HashPartitioner. For the number of partitions, if
* spark.default.parallelism is set, then we'll use the value from SparkContext
* defaultParallelism, otherwise we'll use the max number of upstream partitions.
*
* Unless spark.default.parallelism is set, the number of partitions will be the
* same as the number of partitions in the largest upstream RDD, as this should
* be least likely to cause out-of-memory errors.
*
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/

, RDD. " " ​​( Hadoop, ,...) (, Hadoop , , InputFormat, ).

, , DAG (, /flatMap/aggregate,...).

, , :

 reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
+2

Source: https://habr.com/ru/post/1669019/


All Articles