How a spark processes an object

To test the Serialization exception in a spark, I wrote the task in two ways.
First way:

package examples import org.apache.spark.SparkConf import org.apache.spark.SparkContext object dd { def main(args: Array[String]):Unit = { val sparkConf = new SparkConf val sc = new SparkContext(sparkConf) val data = List(1,2,3,4,5) val rdd = sc.makeRDD(data) val result = rdd.map(elem => { funcs.func_1(elem) }) println(result.count()) } } object funcs{ def func_1(i:Int): Int = { i + 1 } } 

Thus, the spark works very well.
Although, when I change it to the next path, it does not work and throws a NotSerializableException.
The second way:

 package examples import org.apache.spark.SparkConf import org.apache.spark.SparkContext object dd { def main(args: Array[String]):Unit = { val sparkConf = new SparkConf val sc = new SparkContext(sparkConf) val data = List(1,2,3,4,5) val rdd = sc.makeRDD(data) val handler = funcs val result = rdd.map(elem => { handler.func_1(elem) }) println(result.count()) } } object funcs{ def func_1(i:Int): Int = { i + 1 } } 

I know the reason why I got the error "task is not serializable", because I am trying to send a non- funcs object from a node driver to a working node in the second example. In the second example, if I make the funcs extend Serializable object, this error will disappear.

But, in my opinion, since funcs is an object, not a class, it is a single and should be serialized and sent from the driver for workers instead of creating an instance inside the working node itself. In this case, although the way the funcs object is funcs is different, I think the non- funcs funcs object funcs sent from the node driver to the working node in both of these examples.

My question is why the first example can be successfully launched, and the second with the error "unserializable".

+4
source share
2 answers

When you run the code in RDD closure (map, filter, etc.), everything needed to execute this code will be packaged, serialized and sent to the executors who will be running. Any objects referenced (or whose fields are specified) will be serialized in this task, and that is where you sometimes get a NotSerializableException .

Your use case is a little more complicated and includes the scala compiler. Typically, calling a function in a scala object is equivalent to calling a static java method. This object never exists - it basically resembles writing inline code. However, if you assign an object to a variable, then you actually create a reference to that object in memory, and the object behaves more like a class and may have problems with serialization.

 scala> object A { def foo() { println("bar baz") } } defined module A scala> A.foo() // static method bar baz scala> val a = A // now we're actually assigning a memory location a: A.type = A$@7e0babb1 scala> a.foo() // dereferences a before calling foo bar baz 
+2
source

In order for Spark to distribute a given operation, the function used in the operation must be serialized. Before serialization, these functions go through a complex process called the ClosureCleaner .

The goal is to β€œdisconnect” closures from their context in order to reduce the size of the graph of the object that needs to be serialized, and to reduce the risk of serialization problems in the process. In other words, make sure that only the code needed to execute the function is serialized and sent to deserialize and execute "on the other hand"

During this process, closure is also evaluated as Serializable to be active in detecting serialization problems at run time ( SparkContext # clean ).

This code is dense and complex, so it’s hard to find the right code path leading to this case.

Intuitively , what happens when a ClosureCleaner finds:

 val result = rdd.map{elem => funcs.func_1(elem) } 

It calculates the internal members of the closure as objects that can be recreated, but there are no further links , so the cleared closure contains {elem => funcs.func_1(elem)} , which can be serialized by the JavaSerializer .

Instead, when the closing cleaner evaluates:

 val handler = funcs val result = rdd.map(elem => { handler.func_1(elem) }) 

He found that the closure has a reference to $outer ( handler ), therefore, it checks the outer scope and adds an instance of the variable and variable to the cleared closure. We could imagine that the resulting cleared closure would be something like this form (this is for illustrative purposes only):

 {elem => val handler = funcs handler.func_1(elem) } 

When a closure is checked for serialization , it cannot serialize. By JVM serialization rules, an object is serialized if all its members are serializable recursively. In this case, the handler refers to a non-serializable object, and the validation fails.

+4
source

Source: https://habr.com/ru/post/1012531/


All Articles