I would like to transfer the object from the node driver to other nodes where the RDD is located, so that each section of the RDD can access this object, as shown in the following snippet.
object HelloSpark { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("Testing HelloSpark") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "xt.HelloKryoRegistrator") val sc = new SparkContext(conf) val rdd = sc.parallelize(1 to 20, 4) val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test")) rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !") .collect() .foreach(println) sc.stop } } // My registrator class HelloKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) = { kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer()) } } //My serializer class HelloSerializer extends Serializer[ImmutableBytesWritable] { override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = { output.writeInt(obj.getLength) output.writeInt(obj.getOffset) output.writeBytes(obj.get(), obj.getOffset, obj.getLength) } override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = { val length = input.readInt() val offset = input.readInt() val bytes = new Array[Byte](length) input.read(bytes, offset, length) new ImmutableBytesWritable(bytes) } }
In the above snippet, I tried serializing ImmutableBytesWritable Kryo into Spark, so I did the following:
- configure the SparkConf instance passed to the spark context, that is, set spark.serializer to org.apache.spark.serializer.KryoSerializer and set spark.kryo.registrator to xt.HelloKryoRegistrator;
- Write a custom Kryo registrar class in which I register the class ImmutableBytesWritable;
- Write a serializer for ImmutableBytesWritable
However, when I submit my Spark app in yarn-client mode, the following exception was thrown:
An exception in the stream "main" org.apache.spark.SparkException: the task is not serializable at org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 166) on org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 158) at org.apache.spark.SparkContext.clean (SparkContext.scala: 1242) at org.apache.spark.rdd.RDD.map (RDD.scala: 270) at xt.HelloSpark $ .main (HelloSpark.scala: 23) at xt.HelloSpark.main (HelloSpark.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0 (native method) at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57) at sun.refre DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) in java.lang.reflect.Method.invoke (Method.java:606) at org.apache.spark.deploy.SparkSubmit $ .launch (SparkSubmit.scala: 325) on org. apache.spark.deploy.SparkSubmit $ .main (SparkSubmit.scala: 75) on org.apac he.spark.deploy.SparkSubmit.main (SparkSubmit.scala) Reason: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable in java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.ava.ava .io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1547) in java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1508) in java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputOutput.Output.Output.Outream.Outream.Outream.Outream.Outream.Outream.Outream.Outream.Outream.Outream.Outream.Output.Outream writeObject0 (ObjectOutputStream.java:1177) in java.io.ObjectOutputStream.writeObject (ObjectOutputStream.javahaps47) on org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 42) at org.apache.spark.s .JavaSerializerInstance.serialize (JavaSerializer.scala: 73) at org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 164) ... 12 more
It seems that ImmutableBytesWritable cannot be serialized by Kryo. So what is the right way to let Spark serialize an object using Kryo? Can Kryo serialize any type?
source share