Adding an index to RDD using shared mutable state

Take this simple RDD as an example to explain the problem:

val testRDD=sc.parallelize(List((1, 2), (3, 4), (3, 6)))

I have this function that will help me implement indexing:

 var sum = 0; 

 def inc(l: Int): Int = {
    sum += l
    sum 
 }

Now I want to create an identifier for each tuple:

val indexedRDD= testRDD.map(x=>(x._1,x._2,inc(1)));

The output RDD must be ((1,2,1), (3,4,2), (3,6,3))

But it turned out that all the values ​​are the same. It takes 1 for all tuples:

((1,2,1), (3,4,1), (3,6,1))

Where am I mistaken? Is there any other way to achieve the same.

+4
source share
1 answer

You are looking for:

def zipWithIndex(): RDD[(T, Long)]

However, pay attention to the docs:

, RDD, , groupBy(), . , , RDD . , RDD sortByKey() .

+3

Source: https://habr.com/ru/post/1622315/


All Articles