I have an application Scala Spark Streamingthat receives data from the same topic from 3 different ones Kafka producers.
Spark streaming application is on the machine to the host 0.0.0.179server Kafka is on the machine with the host 0.0.0.178, Kafka producersis on the machine, 0.0.0.180, 0.0.0.181, 0.0.0.182.
When I try to run the application Spark Streaming, the resulting error is below
An exception in the thread "main" org.apache.spark.SparkException: Job was aborted due to a stage failure: Task 0 at step 19.0 failed 1 time, last failure: lost task 0.0 at step 19.0 (TID 19, localhost): java. util.ConcurrentModificationException: KafkaConsumer is not safe for multithreaded access in org.apache.kafka.clients.consumer.KafkaConsumer.acquire (KafkaConsumer.java:1625) in org.apache.kafka.clients.consumer.KafkaConsumer.seek.Kafava 1198) in org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek (CachedKafkaConsumer.scala: 95) in org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get (CachedKafkaConsumer.scala: 69) in org.apachespark .streaming.kafka010.KafkaRDD $ KafkaRDDIterator.next (KafkaRDD.scala: 228) at org.apache.spark.streaming.kafka010.KafkaRDD $ KafkaRDDIterator.next (KafkaRDD.scala: 194).collection.Iterator $$ anon $ 11.next (Iterator.scala: 409) at scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 409) at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13 $$ anonfun $ apply $ 7.Apply $ μV $ sp (PairRDDFunctions.scala: 1204) at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13 $$ anonfun .Apply (PairRDDFunctions.scala: 1203) in org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13 $$ anonfun $ apply $ 7.Apply (PairRDDFunctions.scala: 1203) in org.ache .util.Utils $ .tryWithSafeFinallyAndFailureCallbacks (Utils.scala: 1325) at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13.apply (PairRDDFunctions.scalaap .11. rddPairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13.apply (PairRDDFunctions.scala: 1190) at org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 70) at org.apache.spark.scheler. run (Task.scala: 85) at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 274) in java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) in java.util .concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) in java.lang.Thread.run (Thread.java:748)1142) in java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) in java.lang.Thread.run (Thread.java:748)1142) in java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) in java.lang.Thread.run (Thread.java:748)
, , , .
? Kakfa ( num.partition 1)?
:
// Create the context with a 5 second batch size
val sparkConf = new SparkConf().setAppName("SparkScript").set("spark.driver.allowMultipleContexts", "true").set("spark.streaming.concurrentJobs", "3").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(3))
case class Thema(name: String, metadata: String)
case class Tempo(unit: String, count: Int, metadata: String)
case class Spatio(unit: String, metadata: String)
case class Stt(spatial: Spatio, temporal: Tempo, thematic: Thema)
case class Location(latitude: Double, longitude: Double, name: String)
case class Datas1(location : Location, timestamp : String, windspeed : Double, direction: String, strenght : String)
case class Sensors1(sensor_name: String, start_date: String, end_date: String, data1: Datas1, stt: Stt)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "0.0.0.178:9092",
"key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"group.id" -> "test_luca",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics1 = Array("topics1")
val s1 = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(record => {
implicit val formats = DefaultFormats
parse(record.value).extract[Sensors1]
}
)
s1.print()
s1.saveAsTextFiles("results/", "")
ssc.start()
ssc.awaitTermination()