Merge multiple RDDs generated in a loop

I call a function in scala that gives RDD[(Long,Long,Double)] as the output.

def helperfunction(): RDD[(Long, Long, Double)]

I call this function in a loop in another part of the code, and I want to combine all the generated RDDs. The loop calling the function looks something like this:

 for (i <- 1 to n){ val tOp = helperfunction() // merge the generated tOp } 

What I want to do is something similar to what StringBuilder will do for you in Java when you want to concatenate strings. I looked at RDD merge methods, which basically indicate the use of a join function like this

 RDD1.union(RDD2) 

But this requires both RDDs to be generated before they are combined. Although I initialize var RDD1 to accumulate the results outside the for loop, I am not sure how I can initialize an empty RDD of type [(Long,Long,Double)] . I also start with a spark, so I'm not even sure that this is the most elegant way to solve this problem.

+5
source share
2 answers

Instead of using vars, you can use functional programming paradigms to achieve the desired result:

 val rdd = (1 to n).map(x => helperFunction()).reduce(_ union _) 

Alternatively, if you still need to create an empty RDD, you can do this using:

 val empty = sc.emptyRDD[(long, long, String)] 
+4
source

You are right that this may not be the best way to do this, but we need more information about what you are trying to accomplish by creating a new RDD with every call to your helper function.

You can define 1 RDD before the loop and assign it to var, and then run it through your loop. Here is an example:

 val rdd = sc.parallelize(1 to 100) val rdd_tuple = rdd.map(x => (x.toLong, (x*10).toLong, x.toDouble)) var new_rdd = rdd_tuple println("Initial RDD count: " + new_rdd.count()) for (i <- 2 to 4) { new_rdd = new_rdd.union(rdd_tuple) } println("New count after loop: " + new_rdd.count()) 
+2
source

Source: https://habr.com/ru/post/1245052/


All Articles