I follow this tutorial and other similar tutorials on task serialization, but my code does not work with Task serialization
error. I do not understand why this is happening. I set the topicOutputMessages
variable outside of foreachRDD
and then read it in foreachPartition
. I also create KafkaProducer
INSIDE foreachPartition
. So what is the problem? I can not understand.
al topicsSet = topicInputMessages.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> metadataBrokerList_InputQueue) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2) messages.foreachRDD(rdd => { rdd.foreachPartition{iter => UtilsDM.setMetadataBrokerList(metadataBrokerList) UtilsDM.setOutputTopic(topicOutputMessages) val producer = UtilsDM.createProducer iter.foreach { msg => val record = new ProducerRecord[String, String](UtilsDM.getOutputTopic(), msg) producer.send(record) } producer.close() } })
EDIT:
object UtilsDM extends Serializable { var topicOutputMessages: String = "" var metadataBrokerList: String = "" var producer: KafkaProducer[String, String] = null def setOutputTopic(t: String): Unit = { topicOutputMessages = t } def setMetadataBrokerList(m: String): Unit = { metadataBrokerList = m } def createProducer: KafkaProducer[String, String] = { val kafkaProps = new Properties() kafkaProps.put("bootstrap.servers", metadataBrokerList) // This is mandatory, even though we don't send key kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") kafkaProps.put("acks", "1") // how many times to retry when produce request fails? kafkaProps.put("retries", "3") // This is an upper limit of how many messages Kafka Producer will attempt to batch before sending (bytes) kafkaProps.put("batch.size", "5") // How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch kafkaProps.put("linger.ms", "5") new KafkaProducer[String, String](kafkaProps) } }
Full stack:
16/11/21 13:47:30 ERROR JobScheduler: Error running job streaming job 1479732450000 ms.0 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103) at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer Serialization stack: - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer, value: org.test.consumer.kafka.KafkaDecisionsConsumer@4a0ee025 ) - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer) - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, <function1>) - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1) - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 more 16/11/21 13:47:30 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103) at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer Serialization stack: - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer, value: org.test.consumer.kafka.KafkaDecisionsConsumer@4a0ee025 ) - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer) - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, <function1>) - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1) - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 more