The problem you are having has nothing to do with closure or RDD, which, contrary to popular belief, are serializable .
This just violates the fundamental Spark rule, which says that you cannot initiate an action or transformation from another action or transformation *, and different versions of this question are asked in SO several times.
To understand why in this case you should think about architecture:
SparkContext
controlled by driver- , , . **.
RDD, , RDD, join
, cartesian
, zip
union
.
, , ( , ), :
val squaresMapBD = sc.broadcast(squaresMap)
def findSquare(n: Int): Seq[(Int, Int)] = {
squaresMapBD.value
.filter{case (k, v) => k == n}
.map{case (k, v) => (n, k)}
.take(1)
}
primes.flatMap(findSquare)
:
primes
.cartesian(squaresRDD)
.filter{case (n, (k, _)) => n == k}.map{case (n, (k, _)) => (n, k)}
primes
(Int, null)
join
:
primes.map((_, null)).join(squaresRDD).map(...)
, , , .
.
RDD , for
. - , collect
toLocalIterator
. foreach
.
* , SparkContext
.
** , .