Apache spark streaming - kafka - read old posts

I am trying to read old messages from Kafka with a spark stream. However, I can only receive messages because they are sent in real time (that is, if I fill in new messages while my spark program is running, then I receive these messages).

I am changing my groupID and consumerID to make sure that zookeeper is not just not sending messages that I know my program saw earlier.

Assuming the spark sees the offset in the zookeeper as -1, should it not read all the old messages in the queue? I just don’t understand how to use the Kafkov queue? I am very new to sparks and kafka, so I cannot rule out that I just misunderstood something.

package com.kibblesandbits import org.apache.spark.SparkContext import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils import net.liftweb.json._ object KafkaStreamingTest { val cfg = new ConfigLoader().load val zookeeperHost = cfg.zookeeper.host val zookeeperPort = cfg.zookeeper.port val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot implicit val formats = DefaultFormats def parser(json: String): String = { return json } def main(args : Array[String]) { val zkQuorum = "test-spark02:9092" val group = "myGroup99" val topic = Map("testtopic" -> 1) val sparkContext = new SparkContext("local[3]", "KafkaConsumer1_New") val ssc = new StreamingContext(sparkContext, Seconds(3)) val json_stream = KafkaUtils.createStream(ssc, zkQuorum, group, topic) var gp = json_stream.map(_._2).map(parser) gp.saveAsTextFiles("/tmp/sparkstreaming/mytest", "json") ssc.start() } 

When doing this, the following message appears. Therefore, I am sure that he does not just not see the message, because the offset is set.

14/12/05 13:34:08 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1417808045047] Added collector for ArrayBuffer sections ([[testtopic, 0], initOffset -1 for broker id: 1, host: test-spark02.vpc, port: 9092], [[testtopic, 1], initOffset -1 for the broker I d: 1, host: test-spark02.vpc, port: 9092], [[testtopic, 2], initOffset -1 for the broker id: 1, host : test-spark02.vpc, port: 9092], [[testtopic, 3], initOffset -1 for broker ID: 1, host: test-spark02.vpc, port: 9092], [[testtopic, 4], initOffset - 1 for broker id: 1, host: test-spark02.vpc, port: 9092])

Then, if I fill out 1000 new messages, I can see these 1000 messages stored in my temporary directory. But I do not know how to read existing messages, which should indicate in (at the moment) tens of thousands.

+5
source share
1 answer

Use the alternative factory method on KafkaUtils , which allows you to provide configuration to the Kafka consumer:

 def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel ): ReceiverInputDStream[(K, V)] 

Then create a map with your kafka configuration and add the parameter "kafka.auto.offset.reset" set to "smallest":

 val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, "zookeeper.connection.timeout.ms" -> "10000", "kafka.auto.offset.reset" -> "smallest" ) 

Provide this configuration with the factory method above. "kafka.auto.offset.reset" β†’ "smallest" says that the consumer starts with the smallest bias in your topic.

+8
source

Source: https://habr.com/ru/post/1208460/


All Articles