Apache Spark: connection operation fails

I know that a spark makes a lazy assessment.

But is this the expected behavior? In the program below, the output is 20.

But if the print statement

System.out.println("/////////////////// After "+MainRDD.count()); 

without commenting, the output will be 40

I do not do this, as in my application, but just for demonstration, I created this program.

  SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JavaSparkSQL"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD<Integer> MainRDD; ArrayList<Integer> list = new ArrayList<>(); JavaRDD<Integer> tmp; for (int i = 0; i < 20; i++) { list.add(i); } MainRDD = sc.parallelize(list);// MainRDD.union(tmp); System.out.println("//////////////////////First "+MainRDD.count()); list.clear(); for (int i = 20; i < 25; i++) { for (int j = 1; j < 5; j++) { list.add(i*j); } tmp = sc.parallelize(list); // System.out.println("/////////////////// Before "+MainRDD.count()); MainRDD = MainRDD.union(tmp); // System.out.println("/////////////////// After "+MainRDD.count()); list.clear(); } System.out.println("/////////////////// last "+MainRDD.count()); } 
+5
source share
1 answer

The source of the problem is the volatile data structure that you use to populate the RDD. When you call sc.parallelize(list) , it does not sc.parallelize(list) state of ArrayList . Since you call clear when you loop out when the data is actually evaluated, there is no data at all.

In truth, I don't know why this behavior changes when you call the count method. Since RDD is not cached, my hunch is that we are talking about the internal components of Spark or JVM, but I will not even try to guess what is actually happening. Maybe someone smarter can understand what exactly is the reason for this behavior.

Just to illustrate what happens:

 val arr = Array(1, 2, 3) val rdd = sc.parallelize(arr) (0 until 3).foreach(arr(_) = 99) val tmp = sc.parallelize(arr) tmp.union(rdd).collect // Array[Int] = Array(99, 99, 99, 99, 99, 99) 

vs.

 val arr = Array(1, 2, 3) val rdd = sc.parallelize(arr) rdd.count() // Long = 3 (0 until 3).foreach(arr(_) = 99) val tmp = sc.parallelize(arr) tmp.union(rdd).collect // Array[Int] = Array(99, 99, 99, 1, 2, 3) sc.getRDDStorageInfo // Array[org.apache.spark.storage.RDDInfo] = Array() 
+2
source

Source: https://habr.com/ru/post/1236228/


All Articles