Fixed streaming issue with Kafka DirectStream:
claim about depletion of the spark flow: could not get entries for spark-executor-a-group a-topic 7 244723248 after polling on 4096
I tried:
1) Adjust the spark.streaming.kafka.consumer.poll.ms increase
- from 512 to 4096, less unsuccessful, but even 10 seconds still failed
2) Adjust artist memory from 1G to 2G
- partially working, much less failed
3) https://issues.apache.org/jira/browse/SPARK-19275
- still failed when the streaming duration is less than 8 seconds ("session.timeout.ms" → "30000")
4) Try Spark 2.1
- the problem is still there
with Scala 2.11.8, Kafka version: 0.10.0.0, Spark version: 2.0.2
Spark Configurations
.config("spark.cores.max", "4") .config("spark.default.parallelism", "2") .config("spark.streaming.backpressure.enabled", "true") .config("spark.streaming.receiver.maxRate", "1024") .config("spark.streaming.kafka.maxRatePerPartition", "256") .config("spark.streaming.kafka.consumer.poll.ms", "4096") .config("spark.streaming.concurrentJobs", "2")
using spark-streaming-kafka-0-10-assembly_2.11-2.1.0.jar
Error stacks:
at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:194) ... at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:108) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:142) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:108) ... at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Loss of 1% + data blocks from Kafka with this failure :( PLS help!