We are trying to allocate the same executor and the same separator for RDD in order to avoid network traffic, as well as shuffling operations such as cogroup and join, do not have scene boundaries, and all conversions are completed in one step.
Thus, for this, we wrap the RDD with our regular RDD class (ExtendRDD.class) in Java, which has the redundant getPreferredLocation function from RDD.class (in scala) as:
public Seq<String> getPreferredLocations(Partition split){ listString.add("11.113.57.142"); listString.add("11.113.57.163"); listString.add("11.113.57.150"); List<String> finalList = new ArrayList<String>(); finalList.add(listString.get(split.index() % listString.size())); Seq<String> toReturnListString = scala.collection.JavaConversions.asScalaBuffer(finalList).toSeq(); return toReturnListString; }
With this, we can control the spark behavior by which the node places the RDD in the cluster. But now the problem is that the sequencer, since these RDDs are different from each other, the spark considers them to be dependent on shuffling and again creates several stages for these shuffling operations. We tried to override the separator method of the same RDD.class class in the same custom RDD as:
public Option<Partitioner> partitioner() { Option<Partitioner> optionPartitioner = new Some<Partitioner>(this.getPartitioner()); return optionPartitioner; }
For a spark to put them at the same stage, it must take into account that these RDDs come from the same separator. Our separator method does not work because the spark accepts a different separator for 2 RDDs and creates several steps for shuffling operations.
We have concluded scala RDD with our custom RDD as:
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class); RDD<String> distFile1 = jsc.textFile("SomePath/data.txt",1); ExtendRDD<String> extendRDD = new ExtendRDD<String>(distFile1, tag);
We create another custom RDD in the same way and get PairRDD (pairRDD2) from this RDD. Then we try to apply the same separator as in the extendRDD object to the PairRDDFunction object using the partitionBy function, and then apply cogroup to this:
RDD<Tuple2<String, String>> pairRDD = extendRDD.keyBy(new KeyByImpl()); PairRDDFunctions<String, String> pair = new PairRDDFunctions<String, String>(pairRDD, tag, tag, null); pair.partitionBy(extendRDD2.getPartitioner()); pair.cogroup(pairRDD2);
All this does not work, because the spark creates several stages when it encounters a cogroup transformation.
Any suggestions on how we can apply the same separator to RDD?