How to effectively join an arbitrary number of RDD?

Connecting two RDDs is easy with RDD1.join(RDD2). However, if I store an arbitrary number of RDDs in List<JavaRDD>, how can I effectively join them?

+4
source share
2 answers

First, note that you cannot join JavaRDD. You will need to get JavaPairRDDusing:

  • groupBy()(or keyBy())
  • cartesian()
  • [flat]mapToPair()
  • zipWithIndex() (useful because it adds an index where it doesn't exist)
  • and etc.

Then, as soon as you have your list, you can join them as follows:

JavaPairRDD<Integer, String> linesA = sc.parallelizePairs(Arrays.asList(
                                            new Tuple2<>(1, "a1"),
                                            new Tuple2<>(2, "a2"),
                                            new Tuple2<>(3, "a3"),
                                            new Tuple2<>(4, "a4")));
JavaPairRDD<Integer, String> linesB = sc.parallelizePairs(Arrays.asList(
                                            new Tuple2<>(1, "b1"),
                                            new Tuple2<>(5, "b5"),
                                            new Tuple2<>(3, "b3")));
JavaPairRDD<Integer, String> linesC = sc.parallelizePairs(Arrays.asList(
                                            new Tuple2<>(1, "c1"),
                                            new Tuple2<>(5, "c6"),
                                            new Tuple2<>(6, "c3")));

// the list of RDDs
List<JavaPairRDD<Integer, String>> allLines = Arrays.asList(linesA, linesB, linesC);

// since we probably don't want to modify any of the datasets in the list, we will
// copy the first one in a separate variable to keep the result
JavaPairRDD<Integer, String> res = allLines.get(0);
for (int i = 1; i < allLines.size(); ++i) {  // note we skip position 0 !
    res = res.join(allLines.get(i))
    /*[1]*/  .mapValues(tuple -> tuple._1 + ':' + tuple._2);
}

Line c [1]is important because it displays a

JavaPairRDD<Integer, Tuple2<String,String>> back to

JavaPairRdd<Integer,String>, .

chrisw, " " :

JavaPairRDD<Integer, String> res;
res = allLines.stream()
              .reduce((rdd1, rdd2) -> rdd1.join(rdd2).mapValues(tup -> tup._1 + ':' + tup._2))
              .get();  // get value from Optional<JavaPairRDD>

, . , RDD . RDD, , , for loop JavaPairRDD<Integer, StringBuilder> res, . .

+2

/ JavaRDD, , , , reduce Java 8, . https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html

final List<JavaRDD> list = getList(); // where getList is your list implementation containing JavaRDD instances

// The JavaRDD class provides rdd() to get the RDD
final JavaRDD rdd = list.stream().map(JavaRDD::rdd).reduce(RDD::join);

String : -

Stream.of("foo", "bar", "baz").reduce(String::concat);

foobarbaz

+1

Source: https://habr.com/ru/post/1675068/


All Articles