How to convert RDD [(key, value)] to a map [Key, RDD [Value]]

I have been looking for a solution for a long time, but have not received any correct algorithm.

Using Spark RDD in scala, how do I convert RDD[(Key, Value)] to Map[key, RDD[Value]] , knowing that I cannot use a collection or other methods that can load data into memory?

In fact, my final goal is to loop on a Map[key, RDD[Value]] by key and call saveAsNewAPIHadoopFile for each RDD[Value]

For example, if I get:

 RDD[("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)] 

I would like to:

 Map[("A" -> RDD[1, 2, 3]), ("B" -> RDD[4, 5]), ("C" -> RDD[6])] 

I wonder if it will cost not too much to do this using filter for each key A, B, C RDD[(Key, Value)] , but I don’t know if it will happen if you call the filter so many times that have different keys effective? (of course not, but maybe using cache ?)

thanks

+6
source share
3 answers

You should use code like this (Python):

 rdd = sc.parallelize( [("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)] ).cache() keys = rdd.keys().distinct().collect() for key in keys: out = rdd.filter(lambda x: x[0] == key).map(lambda (x,y): y) out.saveAsNewAPIHadoopFile (...) 

One RDD cannot be part of another RDD, and you cannot just collect keys and convert the associated values ​​into a separate RDD. In my example, you will iterate over cached RDD, which is fine and will work quickly

+2
source

It looks like you really want to save the KV RDD to a separate file for each key. Instead of creating a Map[Key, RDD[Value]] consider using a MultipleTextOutputFormat similar to the example here. The code is pretty much present in this example.

The advantage of this approach is that you are guaranteed to pass one pass through the RDD after shuffling, and you will get the same result that you need. If you did this by filtering and creating multiple identifiers, as suggested in another answer (unless your source supported pushdown filters), you should go through one dataset pass for each individual key, which will be slower.

0
source

This is my simple test code.

 val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6))) val groupby_RDD = test_RDD.groupByKey() val result_RDD = groupby_RDD.map{v => var result_list:List[Int] = Nil for (i <- v._2) { result_list ::= i } (v._1, result_list) } 

Result below

 result_RDD.take(3) >> res86: Array[(String, List[Int])] = Array((A,List(1, 3, 2)), (B,List(5, 4)), (C,List(6))) 

Or you can do it like this:

 val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6))) val nil_list:List[Int] = Nil val result2 = test_RDD.aggregateByKey(nil_list)( (acc, value) => value :: acc, (acc1, acc2) => acc1 ::: acc2 ) 

The result is

 result2.take(3) >> res209: Array[(String, List[Int])] = Array((A,List(3, 2, 1)), (B,List(5, 4)), (C,List(6))) 
-1
source

Source: https://habr.com/ru/post/981446/


All Articles