Binding spark closure arguments

I work with Apache Spark in Scala.

I have a problem trying to manipulate one RDD data from a second RDD. I am trying to pass the 2nd RDD as an argument to a function "mapped" to the first RDD, but it seems the closure created in this function binds an uninitialized version of this value.

Below is a simpler code snippet that shows the type of problem I am seeing. (My real example, when I had problems, is more and less clear).

I really don't understand the argument binding rules for closing Spark.

What I'm really looking for is a basic approach or an example of how to manage one RDD using the contents of another (which was previously built elsewhere).

In the following code, the call to Test1.process (sc) will fail using the null pointer in findSquare (since the second argument binding in the close is not initialized)

object Test1 {

  def process(sc: SparkContext) {
    val squaresMap = (1 to 10).map(n => (n, n * n))
    val squaresRDD = sc.parallelize(squaresMap)

    val primes = sc.parallelize(List(2, 3, 5, 7))

    for (p <- primes) {
      println("%d: %d".format(p, findSquare(p, squaresRDD)))
    }
  }

  def findSquare(n: Int, squaresRDD: RDD[(Int, Int)]): Int = {
    squaresRDD.filter(kv => kv._1 == n).first._1
  }
}
+4
source share
2 answers

The problem you are having has nothing to do with closure or RDD, which, contrary to popular belief, are serializable .

This just violates the fundamental Spark rule, which says that you cannot initiate an action or transformation from another action or transformation *, and different versions of this question are asked in SO several times.

To understand why in this case you should think about architecture:

  • SparkContext controlled by driver
  • , , . **.

RDD, , RDD, join, cartesian, zip union.

, , ( , ), :

val squaresMapBD = sc.broadcast(squaresMap)

def findSquare(n: Int): Seq[(Int, Int)] = {
  squaresMapBD.value
    .filter{case (k, v) => k == n}
    .map{case (k, v) => (n, k)}
    .take(1)
}

primes.flatMap(findSquare)

:

primes
  .cartesian(squaresRDD)
  .filter{case (n, (k, _)) => n == k}.map{case (n, (k, _)) => (n, k)}

primes (Int, null) join :

primes.map((_, null)).join(squaresRDD).map(...)

, , , .

.

RDD , for. - , collect toLocalIterator. foreach.


* , SparkContext.

** , .

+4

RDD , rdd rdd trasformation. rdd for, foreach, rdd api.

rdd, , ( , rdd )

-3

Source: https://habr.com/ru/post/1612078/


All Articles