One option is to make a broadcast connection by collecting rdd1 into the driver and passing it to all the rdd1 ; done correctly, this will allow us to avoid the expensive shuffling of a large rdd2 RDD:
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))) val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333))) val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap()) val joined = rdd2.mapPartitions({ iter => val m = rdd1Broadcast.value for { ((t, w), u) <- iter if m.contains(t) } yield ((t, w), (u, m.get(t).get)) }, preservesPartitioning = true)
preservesPartitioning = true tells Spark that this map function does not change the rdd2 keys; this will allow Spark to avoid re-partitioning rdd2 for any subsequent operations that are connected based on the key (t, w) .
This transmission may be inefficient because it is associated with a communication bottleneck in the driver. In principle, it is possible to transfer one RDD to another without involving a driver; I have a prototype of this that I would like to generalize and add to Spark.
Another option is to re-map the rdd2 keys and use the Spark join method; this will include a complete shuffle of rdd2 (and possibly rdd1 ):
rdd1.join(rdd2.map { case ((t, w), u) => (t, (w, u)) }).map { case (t, (v, (w, u))) => ((t, w), (u, v)) }.collect()
In my input example, both of these methods give the same result:
res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C)))
The third option is to restructure rdd2 so that t its key, and then perform the above connection.