Spark: how to combine a <RDD> list in an RDD

I am new to spark and scala language and would like to combine all the RDDs in the list as shown below ( List<RDD> to RDD ):

  val data = for (item <- paths) yield { val ad_data_path = item._1 val ad_data = SparkCommon.sc.textFile(ad_data_path).map { line => { val ad_data = new AdData(line) (ad_data.ad_id, ad_data) } }.distinct() } val ret = SparkCommon.sc.parallelize(data).reduce(_ ++ _) 

I run the code in IntelliJ, always getting an error message:

 ava.lang.NullPointerException at org.apache.spark.rdd.RDD.<init>(RDD.scala:125) at org.apache.spark.rdd.UnionRDD.<init>(UnionRDD.scala:59) at org.apache.spark.rdd.RDD.union(RDD.scala:438) at org.apache.spark.rdd.RDD.$plus$plus(RDD.scala:444) at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99) at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99) at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:177) at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:172) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) at org.apache.spark.InterruptibleIterator.reduceLeft(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:847) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:845) at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157) at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 

Does anyone know of a bug? Thanks in advance:)

+6
source share
2 answers

This may be the reason

 val listA = 1 to 10 for(i <- listA; if i%2 == 0)yield {i} 

will return Vector (2,4,6,8,10), whereas

 for(i <- listA; if i%2 == 0)yield {val c = i} 

will return Vector ((), (), (), (), ())

This is exactly what happens in your case. You initialize ad_data strong> but do not return it back.

As for your question, ie List [RDD] for RDD

here is the solution:

 val listA = sc.parallelize(1 to 10) val listB = sc.parallelize(10 to 1 by -1) 

create a list of 2 RDDS

 val listC = List(listA,listB) 

convert List [RDD] to RDD

 val listD = listC.reduce(_ union _) 

Hope this answers your question.

+17
source

Another easy way to convert from an RDD to RDD list. SparkContext has two overloaded union methods, one accepts two RDDs and the other accepts an RDD list

union (first, rest) union (rdds: Seq [RDD [T]]))

0
source

Source: https://habr.com/ru/post/987854/


All Articles