I get below error with kafka 0.10.1.0 and spark 2.0.2
private val spark = SparkSession.builder() .master("local[*]") .appName(job.name) .config("spark.cassandra.connection.host","localhost")) .config("spark.cassandra.connection.port","9042") .config("spark.streaming.receiver.maxRate", 10000) .config("spark.streaming.kafka.maxRatePerPartition", 10000) .config("spark.streaming.kafka.consumer.cache.maxCapacity", 1) .config("spark.streaming.kafka.consumer.cache.initialCapacity", 1) .getOrCreate() val kafkaParams = Map[String, Object]( "bootstrap.servers" -> config.getString("kafka.hosts"), "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> job.name, "auto.offset.reset" -> config.getString("kafka.offset"), "enable.auto.commit" -> (false: java.lang.Boolean) )`
An exception
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1557) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1177) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
allreade saw a conversation, but permission was not yet https://www.mail-archive.com/ user@spark.apache.org /msg56566.html
source share