Spark TaskNotSerializable when using an anonymous function

Background

Here is my situation: I am trying to create a class that filters RDD based on some content function, but this function may differ in different scenarios, so I would like to parameterize it using the function. Unfortunately, I seem to be having problems with how Scala captures its closures. Even though my function is serializable, the class is not.

From the example in, it seems that my situation cannot be resolved, but I am convinced there is a way to achieve what I am trying to do by creating the correct (smaller) closure.

My code

class MyFilter(getFeature: Element => String, other: NonSerializable) {
  def filter(rdd: RDD[Element]): RDD[Element] = {
    // All my complicated logic I want to share
    rdd.filter { elem => getFeature(elem) == "myTargetString" }     
}

Simplified example

class Foo(f: Int => Double, rdd: RDD[Int]) { 
  def go(data: RDD[Int]) = data.map(f) 
}

val works = new Foo(_.toDouble, otherRdd)
works.go(myRdd).collect() // works

val myMap = Map(1 -> 10d)
val complicatedButSerializableFunc: Int => Double = x => myMap.getOrElse(x, 0)
val doesntWork = new Foo(complicatedButSerializableFunc, otherRdd)
doesntWork.go(myRdd).collect() // craps out

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: $iwC$$iwC$Foo
Serialization stack:
    - object not serializable (class: $iwC$$iwC$Foo, value: $iwC$$iwC$Foo@61e33118)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: foo, type: class $iwC$$iwC$Foo)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@47d6a31a)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>)

// Even though
val out = new ObjectOutputStream(new FileOutputStream("test.obj"))
out.writeObject(complicatedButSerializableFunc) // works

Questions

  • Why doesn't the first simplified example try to serialize everything Foo, but does the second do?
  • Foo ?
+4
1

.

, Scala ( - , , ). , , ol 'javascript for-loop .

def enclose[E, R](enclosed: E)(func: E => R): R = func(enclosed)

class Foo(f: Int => Double, somethingNonserializable: RDD[String]) { 
 def go(data: RDD[Int]) = enclose(f) { actualFunction => data.map(actualFunction) } 
}

JS

def go(data: RDD[Int]) = ((actualFunction: Int => Double) => data.map(actualFunction))(f)
+8

Source: https://habr.com/ru/post/1609831/


All Articles