The problem you are having has nothing to do with closure or RDD, which, contrary to popular belief, are serializable .
This just violates the fundamental Spark rule, which says that you cannot initiate an action or transformation from another action or transformation *, and different versions of this question are asked in SO several times.
To understand why in this case you should think about architecture:
SparkContext controlled by driver- , , . **.
RDD, , RDD, join, cartesian, zip union.
, , ( , ), :
val squaresMapBD = sc.broadcast(squaresMap)
def findSquare(n: Int): Seq[(Int, Int)] = {
squaresMapBD.value
.filter{case (k, v) => k == n}
.map{case (k, v) => (n, k)}
.take(1)
}
primes.flatMap(findSquare)
:
primes
.cartesian(squaresRDD)
.filter{case (n, (k, _)) => n == k}.map{case (n, (k, _)) => (n, k)}
primes (Int, null) join :
primes.map((_, null)).join(squaresRDD).map(...)
, , , .
.
RDD , for. - , collect toLocalIterator. foreach.
* , SparkContext.
** , .