Spark: Efficient Mass Search in RDD Pairs

In Apache Spark, I have two RDDs. First data : RDD[(K,V)] containing data in the form of a key. The second pairs : RDD[(K,K)] contains a set of interesting key pairs of this data.

How can I effectively build RDD pairsWithData : RDD[((K,K)),(V,V))] so that it contains all the elements from pairs as a key tuple and their corresponding values ​​(from data ) as the value - multiple?

Some data properties:

  • data keys are unique
  • All entries in pairs unique.
  • For all pairs (k1,k2) in pairs , it is guaranteed that k1 <= k2
  • The size of "pairs" is only a constant data size |pairs| = O(|data|) |pairs| = O(|data|)
  • Current data sizes (growth is expected): |data| ~ 10^8, |pairs| ~ 10^10 |data| ~ 10^8, |pairs| ~ 10^10

Current attempts

Here is a sample code in Scala:

 import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ // This kind of show the idea, but fails at runtime. def massPairLookup1(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = { keyPairs map {case (k1,k2) => val v1 : String = data lookup k1 head; val v2 : String = data lookup k2 head; ((k1, k2), (v1,v2)) } } // Works but is O(|data|^2) def massPairLookup2(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = { // Construct all possible pairs of values val cartesianData = data cartesian data map {case((k1,v1),(k2,v2)) => ((k1,k2),(v1,v2))} // Select only the values who keys are in keyPairs keyPairs map {(_,0)} join cartesianData mapValues {_._2} } // Example function that find pairs of keys // Runs in O(|data|) in real life, but cannot maintain the values def relevantPairs(data : RDD[(Int, String)]) = { val keys = data map (_._1) keys cartesian keys filter {case (x,y) => x*y == 12 && x < y} } // Example run val data = sc parallelize(1 to 12) map (x => (x, "Number " + x)) val pairs = relevantPairs(data) val pairsWithData = massPairLookup2(pairs, data) // Print: // ((1,12),(Number1,Number12)) // ((2,6),(Number2,Number6)) // ((3,4),(Number3,Number4)) pairsWithData.foreach(println) 

Attempt 1

At first I tried just using the lookup function on data , but that causes a runtime error at runtime. It seems that self is null in the PairRDDFunctions .

Also, I'm not sure about lookup performance. The documentation says that this operation is performed efficiently if the RDD has a well-known sectionist, only looking at the section to which the key is attached. It seems that searching for n at best takes O (n * | partition |) time, which I suspect can be optimized.

Attempt 2

This attempt works, but I create |data|^2 pairs that will kill performance. I do not expect Spark to be able to optimize this.

+6
source share
1 answer

Your search 1 does not work because you cannot perform RDD transformations inside workers (inside another transform).

In search 2, I do not consider it necessary to complete the full Cartesian ...

You can do it as follows:

 val firstjoin = pairs.map({case (k1,k2) => (k1, (k1,k2))}) .join(data) .map({case (_, ((k1, k2), v1)) => ((k1, k2), v1)}) val result = firstjoin.map({case ((k1,k2),v1) => (k2, ((k1,k2),v1))}) .join(data) .map({case(_, (((k1,k2), v1), v2))=>((k1, k2), (v1, v2))}) 

Or in a denser form:

  val firstjoin = pairs.map(x => (x._1, x)).join(data).map(_._2) val result = firstjoin.map({case (x,y) => (x._2, (x,y))}) .join(data).map({case(x, (y, z))=>(y._1, (y._2, z))}) 

I don’t think you can do it more efficiently, but I could be wrong ...

+5
source

Source: https://habr.com/ru/post/981104/


All Articles