Spark streaming statement failed: Failed to get entries for spark-executor-a-group a-topic 7 244723248 after polling on 4096

Fixed streaming issue with Kafka DirectStream:

claim about depletion of the spark flow: could not get entries for spark-executor-a-group a-topic 7 244723248 after polling on 4096

I tried:

1) Adjust the spark.streaming.kafka.consumer.poll.ms increase

- from 512 to 4096, less unsuccessful, but even 10 seconds still failed

2) Adjust artist memory from 1G to 2G

- partially working, much less failed

3) https://issues.apache.org/jira/browse/SPARK-19275

- still failed when the streaming duration is less than 8 seconds ("session.timeout.ms" → "30000")

4) Try Spark 2.1

- the problem is still there


with Scala 2.11.8, Kafka version: 0.10.0.0, Spark version: 2.0.2

Spark Configurations

.config("spark.cores.max", "4") .config("spark.default.parallelism", "2") .config("spark.streaming.backpressure.enabled", "true") .config("spark.streaming.receiver.maxRate", "1024") .config("spark.streaming.kafka.maxRatePerPartition", "256") .config("spark.streaming.kafka.consumer.poll.ms", "4096") .config("spark.streaming.concurrentJobs", "2") 

using spark-streaming-kafka-0-10-assembly_2.11-2.1.0.jar

Error stacks:

 at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:194) ... at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:108) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:142) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:108) ... at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 

Loss of 1% + data blocks from Kafka with this failure :( PLS help!

+5
source share
1 answer

Current solution:

  • Increase num.network.threads in kafka / config / server.properties file, default 3
  • Increase spark.streaming.kafka.consumer.poll.ms to ~! big ... without config spark.streaming.kafka.consumer.poll.ms, it uses spark.network.timeout, which is 120 seconds, which causes some problem.
  • Optional step: Decrease "max.poll.records", the default is 500
  • Extra step: use Future {} to work with time in parallel
+3
source

Source: https://habr.com/ru/post/1264282/


All Articles