Using a serializable object: caused by: java.io.NotSerializableException

I follow this tutorial and other similar tutorials on task serialization, but my code does not work with Task serialization error. I do not understand why this is happening. I set the topicOutputMessages variable outside of foreachRDD and then read it in foreachPartition . I also create KafkaProducer INSIDE foreachPartition . So what is the problem? I can not understand.

 al topicsSet = topicInputMessages.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> metadataBrokerList_InputQueue) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2) messages.foreachRDD(rdd => { rdd.foreachPartition{iter => UtilsDM.setMetadataBrokerList(metadataBrokerList) UtilsDM.setOutputTopic(topicOutputMessages) val producer = UtilsDM.createProducer iter.foreach { msg => val record = new ProducerRecord[String, String](UtilsDM.getOutputTopic(), msg) producer.send(record) } producer.close() } }) 

EDIT:

 object UtilsDM extends Serializable { var topicOutputMessages: String = "" var metadataBrokerList: String = "" var producer: KafkaProducer[String, String] = null def setOutputTopic(t: String): Unit = { topicOutputMessages = t } def setMetadataBrokerList(m: String): Unit = { metadataBrokerList = m } def createProducer: KafkaProducer[String, String] = { val kafkaProps = new Properties() kafkaProps.put("bootstrap.servers", metadataBrokerList) // This is mandatory, even though we don't send key kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") kafkaProps.put("acks", "1") // how many times to retry when produce request fails? kafkaProps.put("retries", "3") // This is an upper limit of how many messages Kafka Producer will attempt to batch before sending (bytes) kafkaProps.put("batch.size", "5") // How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch kafkaProps.put("linger.ms", "5") new KafkaProducer[String, String](kafkaProps) } } 

Full stack:

 16/11/21 13:47:30 ERROR JobScheduler: Error running job streaming job 1479732450000 ms.0 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103) at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer Serialization stack: - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer, value: org.test.consumer.kafka.KafkaDecisionsConsumer@4a0ee025 ) - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer) - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, <function1>) - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1) - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 more 16/11/21 13:47:30 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103) at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer Serialization stack: - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer, value: org.test.consumer.kafka.KafkaDecisionsConsumer@4a0ee025 ) - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer) - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, <function1>) - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1) - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 more 
+6
source share
3 answers

The serialization problem is how Spark handles closure serialization (which you can read in more detail in this answer: How a spark processes an object )

In the failed code, referencing metadataBrokerList and topicOutputMessages here:

 rdd.foreachPartition{iter => UtilsDM.setMetadataBrokerList(metadataBrokerList) UtilsDM.setOutputTopic(topicOutputMessages) 

creates a link to the external object where these variables are created, and causes the closure cleaner in Spark to be included in the β€œcleared” closure. outer then includes sparkContext and streamingContext in the closure, which are not serializable and, therefore, serialization exception.

In the second attempt (in a workaround published as an answer), these links are broken because the variables are now contained in the help object, and the closure can be "cleared" of the outer context.

I think adding @transient to these variables is not required in the UtilsDM object, given that the values ​​are serializable. Keep in mind that single objects are recreated in each artist. Therefore, the value of mutable variables changed in the driver will not be available in executors, which often leads to a NullPointerException if it is not handled properly.

There is a serialization trick that will help in the source scenario:

Copy reference variables to close. eg.

 rdd.foreachPartition{iter => val innerMDBL = metadataBrokerList val innerTOM = topicOutputMessages UtilsDM.setMetadataBrokerList(innerMDBL) UtilsDM.setOutputTopic(innerTOM) 

Thus, the values ​​are copied at compile time, and there is no connection with the external.

To work with objects related to the executor (for example, non-serializable connections or even local caches), I prefer to use the factory method, for example, explained in this answer: Redis on Spark: The task cannot be serialized

+2
source

I think the problem lies in your UtilsDM class. It is captured by the closure, and Spark tries to serialize the code to send to its executors.

Try serializing UtilsDM or creating it inside the foreachRDD function.

+1
source

This is not an answer to my question, but it is an option that works. Maybe someone can develop it in the final answer? The disadvantage of this solution is that metadataBrokerList and topicOutputMessages should be fixed from UtilsTest code using @transient lazy val topicOutputMessages and @transient lazy val metadataBrokerList , but ideally I would like to pass these parameters as input parameters:

 object TestRunner { var zkQuorum: String = "" var metadataBrokerList: String = "" var group: String = "" val topicInputMessages: String = "" def main(args: Array[String]) { if (args.length < 14) { System.err.println("Usage: TestRunner <zkQuorum> <metadataBrokerList> " + "<group> <topicInputMessages>") System.exit(1) } val Array(zkQuorum,metadataBrokerList,group,topicInputMessages) = args setParameters(zkQuorum,metadataBrokerList,group,topicInputMessages) run(kafka_num_threads.toInt) } def setParameters(mi: String, mo: String, g: String,t: String) { zkQuorum = mi metadataBrokerList = mo group = g topicInputMessages = t } def run(kafkaNumThreads: Int) = { val conf = new SparkConf() .setAppName("TEST") val sc = new SparkContext(conf) sc.setCheckpointDir("~/checkpointDir") val ssc = new StreamingContext(sc, Seconds(5)) val topicMessagesMap = topicInputMessages.split(",").map((_, 1)).toMap val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMessagesMap).map(_._2) messages.foreachRDD(rdd => { rdd.foreachPartition{iter => val producer = UtilsTest.createProducer iter.foreach { msg => val record = new ProducerRecord[String, String](UtilsTest.getOutputTopic(), msg) producer.send(record) } producer.close() } }) ssc.start() ssc.awaitTermination() } } object UtilsDM extends Serializable { @transient lazy val topicOutputMessages: String = "myTestTopic" @transient lazy val metadataBrokerList: String = "172.12.34.233:9092" var producer: KafkaProducer[String, String] = null def createProducer: KafkaProducer[String, String] = { val kafkaProps = new Properties() kafkaProps.put("bootstrap.servers", metadataBrokerList) // This is mandatory, even though we don't send key kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") kafkaProps.put("acks", "1") // how many times to retry when produce request fails? kafkaProps.put("retries", "3") // This is an upper limit of how many messages Kafka Producer will attempt to batch before sending (bytes) kafkaProps.put("batch.size", "5") // How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch kafkaProps.put("linger.ms", "5") new KafkaProducer[String, String](kafkaProps) } } 
0
source

Source: https://habr.com/ru/post/1012528/


All Articles