Spark: how to combine a <RDD> list in an RDD
I am new to spark and scala language and would like to combine all the RDDs in the list as shown below ( List<RDD> to RDD ):
val data = for (item <- paths) yield { val ad_data_path = item._1 val ad_data = SparkCommon.sc.textFile(ad_data_path).map { line => { val ad_data = new AdData(line) (ad_data.ad_id, ad_data) } }.distinct() } val ret = SparkCommon.sc.parallelize(data).reduce(_ ++ _) I run the code in IntelliJ, always getting an error message:
ava.lang.NullPointerException at org.apache.spark.rdd.RDD.<init>(RDD.scala:125) at org.apache.spark.rdd.UnionRDD.<init>(UnionRDD.scala:59) at org.apache.spark.rdd.RDD.union(RDD.scala:438) at org.apache.spark.rdd.RDD.$plus$plus(RDD.scala:444) at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99) at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99) at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:177) at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:172) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) at org.apache.spark.InterruptibleIterator.reduceLeft(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:847) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:845) at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157) at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Does anyone know of a bug? Thanks in advance:)
+6
2 answers
This may be the reason
val listA = 1 to 10 for(i <- listA; if i%2 == 0)yield {i} will return Vector (2,4,6,8,10), whereas
for(i <- listA; if i%2 == 0)yield {val c = i} will return Vector ((), (), (), (), ())
This is exactly what happens in your case. You initialize ad_data strong> but do not return it back.
As for your question, ie List [RDD] for RDD
here is the solution:
val listA = sc.parallelize(1 to 10) val listB = sc.parallelize(10 to 1 by -1) create a list of 2 RDDS
val listC = List(listA,listB) convert List [RDD] to RDD
val listD = listC.reduce(_ union _) Hope this answers your question.
+17