As Giovanni noted, the problem is that the class loaders are different (you can examine this by calling .getClass.getClassLoaderon any object). Then, when workers try to deserialize their reflected function, all hell breaks.
, . , . , . , - node, , .
val f = udf {
new Function1[String,Int] with Serializable {
import scala.reflect.runtime.universe._
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
lazy val toolbox = currentMirror.mkToolBox()
lazy val func = {
println("reflected function")
toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int]
}
def apply(s: String): Int = func(s)
}
}
sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show .
println - , . spark-shell --master 'local', , spark-shell --master 'local[2]' .
UDF , , , toolbox func . , , - .