Scala reflection with serialization (by Sparks) - Symbols are not serializable

To start, I use scala 2.10.4, and the above example runs in Spark 1.6 (although I doubt that Spark has anything to do with this, it's just a serialization issue).

So here is my problem: suppose I have a Base trait that is implemented, say, by two classes B1 and B2 . Now I have a common attribute that extends with a set of classes, one of which is associated with Base subtypes, for example. (here I keep Spark's concept of RDD, but it may be something else in fact, as soon as it is serialized, something is the result, whatever it really is):

 trait Foo[T] { def function(rdd: RDD[T]): Something } class Foo1[B <: Base] extends Foo[B] { def function(rdd: RDD[B]): Something = ... } class Foo2 extends Foo[A] { def function(rdd: RDD[A]): Something = ... } ... 

Now I need an object that takes RDD[T] (do not assume that there is no ambiguity, this is just a simplified version), which returns Something corresponding to the result of the function corresponding to type T But it should also work for Array[T] using a merge strategy. So far it looks like this:

 object Obj { def compute[T: TypeTag](input: RDD[T]): Something = { typeOf[T] match { case t if t <:< typeOf[A] => val foo = new Foo[T] foo.function(rdd) case t if t <:< typeOf[Array[A]] => val foo = new Foo[A] foo.function(rdd.map(x => mergeArray(x.asInstance[Array[A]]))) case t if t <:< typeOf[Base] => val foo = new Foo[T] foo.function(rdd) // here it gets ugly... case t if t <:< typeOf[Array[_]] => // doesn't fall through with Array[Base]... why? val tt = getSubInfo[T](0) val tpe = tt.tpe val foo = new Foo[tpe.type] foo.function(rdd.map(x => (x._1, mergeArray(x._2.asInstanceOf[Array[tpe.type]])) } } // strategy to transform arrays of T into a T object when possible private def mergeArray[T: TypeTag](a: Array[T]): T = ... // extract the subtype, eg if Array[Int] then at position 0 extracts a type tag for Int, I can provide the code but not fondamental for the comprehension of the problem though private def getSubInfo[T: TypeTag](i: Int): TypeTag[_] = ... } 

Unfortunately, it works fine on the local machine, but when it goes to Spark (serialized), I get org.apache.spark.SparkException: Task not serializable with:

 Caused by: java.io.NotSerializableException: scala.reflect.internal.Symbols$PackageClassSymbol Serialization stack: - object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package types) - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol) 

I have a workaround (fairly obvious listing of features), but for my curiosity, is there any way to fix this? And why are they not serializable characters, while their equivalents in Manifests were?

Thanks for the help.

+5
source share
1 answer

TypeTags are usually now serialized in scala, but that’s weird and not directly (it’s odd because typetags contain characters that are not: - /).

It can do what you want

 // implicit constructor TypeTag parameter is serialized. abstract class TypeAware[T:TypeTag] extends Serializable { def typ:Type = _typeCached @transient lazy val _typeCached:Type = typeOf[T] } trait Foo[T] extends Serializable { def function(rdd: RDD[T]): Something {... impl here?...} def typ:Type } class Concrete[T:TypeTag] extends TypeAware[T] with Foo[T] with Serializable{ def function(rdd: RDD[T]): Something {... impl here?...} } 
+1
source

Source: https://habr.com/ru/post/1242764/


All Articles