How to allow Spark to serialize an object using Kryo?

I would like to transfer the object from the node driver to other nodes where the RDD is located, so that each section of the RDD can access this object, as shown in the following snippet.

object HelloSpark { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("Testing HelloSpark") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "xt.HelloKryoRegistrator") val sc = new SparkContext(conf) val rdd = sc.parallelize(1 to 20, 4) val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test")) rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !") .collect() .foreach(println) sc.stop } } // My registrator class HelloKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) = { kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer()) } } //My serializer class HelloSerializer extends Serializer[ImmutableBytesWritable] { override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = { output.writeInt(obj.getLength) output.writeInt(obj.getOffset) output.writeBytes(obj.get(), obj.getOffset, obj.getLength) } override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = { val length = input.readInt() val offset = input.readInt() val bytes = new Array[Byte](length) input.read(bytes, offset, length) new ImmutableBytesWritable(bytes) } } 

In the above snippet, I tried serializing ImmutableBytesWritable Kryo into Spark, so I did the following:

  • configure the SparkConf instance passed to the spark context, that is, set spark.serializer to org.apache.spark.serializer.KryoSerializer and set spark.kryo.registrator to xt.HelloKryoRegistrator;
  • Write a custom Kryo registrar class in which I register the class ImmutableBytesWritable;
  • Write a serializer for ImmutableBytesWritable

However, when I submit my Spark app in yarn-client mode, the following exception was thrown:

An exception in the stream "main" org.apache.spark.SparkException: the task is not serializable at org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 166) on org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 158) at org.apache.spark.SparkContext.clean (SparkContext.scala: 1242) at org.apache.spark.rdd.RDD.map (RDD.scala: 270) at xt.HelloSpark $ .main (HelloSpark.scala: 23) at xt.HelloSpark.main (HelloSpark.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0 (native method) at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57) at sun.refre DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) in java.lang.reflect.Method.invoke (Method.java:606) at org.apache.spark.deploy.SparkSubmit $ .launch (SparkSubmit.scala: 325) on org. apache.spark.deploy.SparkSubmit $ .main (SparkSubmit.scala: 75) on org.apac he.spark.deploy.SparkSubmit.main (SparkSubmit.scala) Reason: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable in java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.ava.ava .io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1547) in java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1508) in java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputOutput.Output.Output.Outream.Outream.Outream.Outream.Outream.Outream.Outream.Outream.Outream.Outream.Outream.Output.Outream writeObject0 (ObjectOutputStream.java:1177) in java.io.ObjectOutputStream.writeObject (ObjectOutputStream.javahaps47) on org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 42) at org.apache.spark.s .JavaSerializerInstance.serialize (JavaSerializer.scala: 73) at org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 164) ... 12 more

It seems that ImmutableBytesWritable cannot be serialized by Kryo. So what is the right way to let Spark serialize an object using Kryo? Can Kryo serialize any type?

+6
source share
1 answer

This is because you are using ImmutableBytesWritable in your closure. Spark does not support serialization of closures with Kryo yet (only objects in RDD). You can take this opportunity to solve your problem:

Spark - The task is not serializable: how to work with complex map closures that cause external classes / objects?

You just need to serialize the objects before going through closure and then de-serialize. This approach just works, even if your classes are not Serializable, because it uses Kryo backstage. All you need is curry .;)

Here's an example sketch:

 def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)]) (foo: Foo) : Bar = { kryoWrapper.value.apply(foo) } val mapper = genMapper(KryoSerializationWrapper(new ImmutableBytesWritable(Bytes.toBytes("This is a test")))) _ rdd.flatMap(mapper).collectAsMap() object ImmutableBytesWritable(bytes: Bytes) extends (Foo => Bar) { def apply(foo: Foo) : Bar = { //This is the real function } } 
0
source

Source: https://habr.com/ru/post/982582/


All Articles