In Apache Spark, I have two RDDs. First data : RDD[(K,V)] containing data in the form of a key. The second pairs : RDD[(K,K)] contains a set of interesting key pairs of this data.
How can I effectively build RDD pairsWithData : RDD[((K,K)),(V,V))] so that it contains all the elements from pairs as a key tuple and their corresponding values ββ(from data ) as the value - multiple?
Some data properties:
data keys are unique- All entries in
pairs unique. - For all pairs
(k1,k2) in pairs , it is guaranteed that k1 <= k2 - The size of "pairs" is only a constant data size
|pairs| = O(|data|) |pairs| = O(|data|) - Current data sizes (growth is expected):
|data| ~ 10^8, |pairs| ~ 10^10 |data| ~ 10^8, |pairs| ~ 10^10
Current attempts
Here is a sample code in Scala:
import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ // This kind of show the idea, but fails at runtime. def massPairLookup1(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = { keyPairs map {case (k1,k2) => val v1 : String = data lookup k1 head; val v2 : String = data lookup k2 head; ((k1, k2), (v1,v2)) } } // Works but is O(|data|^2) def massPairLookup2(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = { // Construct all possible pairs of values val cartesianData = data cartesian data map {case((k1,v1),(k2,v2)) => ((k1,k2),(v1,v2))} // Select only the values who keys are in keyPairs keyPairs map {(_,0)} join cartesianData mapValues {_._2} } // Example function that find pairs of keys // Runs in O(|data|) in real life, but cannot maintain the values def relevantPairs(data : RDD[(Int, String)]) = { val keys = data map (_._1) keys cartesian keys filter {case (x,y) => x*y == 12 && x < y} } // Example run val data = sc parallelize(1 to 12) map (x => (x, "Number " + x)) val pairs = relevantPairs(data) val pairsWithData = massPairLookup2(pairs, data) // Print: // ((1,12),(Number1,Number12)) // ((2,6),(Number2,Number6)) // ((3,4),(Number3,Number4)) pairsWithData.foreach(println)
Attempt 1
At first I tried just using the lookup function on data , but that causes a runtime error at runtime. It seems that self is null in the PairRDDFunctions .
Also, I'm not sure about lookup performance. The documentation says that this operation is performed efficiently if the RDD has a well-known sectionist, only looking at the section to which the key is attached. It seems that searching for n at best takes O (n * | partition |) time, which I suspect can be optimized.
Attempt 2
This attempt works, but I create |data|^2 pairs that will kill performance. I do not expect Spark to be able to optimize this.