Spark: what's the best strategy for combining 2-key RDD with one-time RDD?

I have two RDDs that I want to join, and they look like this:

val rdd1:RDD[(T,U)] val rdd2:RDD[((T,W), V)] 

It happens that the key values โ€‹โ€‹of rdd1 unique, and also that the values โ€‹โ€‹of tuple-key rdd2 unique. I would like to join two datasets to get the following rdd:

 val rdd_joined:RDD[((T,W), (U,V))] 

What is the most effective way to achieve this? Here are a few ideas I was thinking about.

Option 1:

 val m = rdd1.collectAsMap val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))}) 

Option 2:

 val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2) 

Option 1 will collect all mastering data, right? Thus, this does not seem like a good option if rdd1 is large (it is relatively large in my case, although it is an order of magnitude smaller than rdd2). Option 2 makes an ugly distinct and Cartesian product that also seems very inefficient. Another possibility that came to my mind (but I havenโ€™t tried it yet) was to make option 1 and map translation, although it would be better to broadcast it in a โ€œsmartโ€ way so that the card keys are located along with the rdd2 keys.

Has anyone encountered a similar situation before? I would be happy to receive your thoughts.

Thank!

+45
scala apache-spark
Jul 12 '13 at 18:25
source share
2 answers

One option is to make a broadcast connection by collecting rdd1 into the driver and passing it to all the rdd1 ; done correctly, this will allow us to avoid the expensive shuffling of a large rdd2 RDD:

 val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))) val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333))) val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap()) val joined = rdd2.mapPartitions({ iter => val m = rdd1Broadcast.value for { ((t, w), u) <- iter if m.contains(t) } yield ((t, w), (u, m.get(t).get)) }, preservesPartitioning = true) 

preservesPartitioning = true tells Spark that this map function does not change the rdd2 keys; this will allow Spark to avoid re-partitioning rdd2 for any subsequent operations that are connected based on the key (t, w) .

This transmission may be inefficient because it is associated with a communication bottleneck in the driver. In principle, it is possible to transfer one RDD to another without involving a driver; I have a prototype of this that I would like to generalize and add to Spark.

Another option is to re-map the rdd2 keys and use the Spark join method; this will include a complete shuffle of rdd2 (and possibly rdd1 ):

 rdd1.join(rdd2.map { case ((t, w), u) => (t, (w, u)) }).map { case (t, (v, (w, u))) => ((t, w), (u, v)) }.collect() 

In my input example, both of these methods give the same result:

 res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C))) 

The third option is to restructure rdd2 so that t its key, and then perform the above connection.

+56
Jul 17 '13 at 2:48 on
source share

Another way to do this is to create a custom separator, and then use zipPartitions to join your RDD.

 import org.apache.spark.HashPartitioner class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) { override def getPartition(key: Any): Int = key match { case k: Tuple2[Int, String] => super.getPartition(k._1) case _ => super.getPartition(key) } } val numSplits = 8 val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits)) val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits)) val result = rdd2.zipPartitions(rdd1)( (iter2, iter1) => { val m = iter1.toMap for { ((t: Int, w), u) <- iter2 if m.contains(t) } yield ((t, w), (u, m.get(t).get)) } ).partitionBy(new HashPartitioner(numSplits)) result.glom.collect 
+12
Apr 18 '14 at 0:25
source share



All Articles