I am trying to read old messages from Kafka with a spark stream. However, I can only receive messages because they are sent in real time (that is, if I fill in new messages while my spark program is running, then I receive these messages).
I am changing my groupID and consumerID to make sure that zookeeper is not just not sending messages that I know my program saw earlier.
Assuming the spark sees the offset in the zookeeper as -1, should it not read all the old messages in the queue? I just donβt understand how to use the Kafkov queue? I am very new to sparks and kafka, so I cannot rule out that I just misunderstood something.
package com.kibblesandbits import org.apache.spark.SparkContext import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils import net.liftweb.json._ object KafkaStreamingTest { val cfg = new ConfigLoader().load val zookeeperHost = cfg.zookeeper.host val zookeeperPort = cfg.zookeeper.port val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot implicit val formats = DefaultFormats def parser(json: String): String = { return json } def main(args : Array[String]) { val zkQuorum = "test-spark02:9092" val group = "myGroup99" val topic = Map("testtopic" -> 1) val sparkContext = new SparkContext("local[3]", "KafkaConsumer1_New") val ssc = new StreamingContext(sparkContext, Seconds(3)) val json_stream = KafkaUtils.createStream(ssc, zkQuorum, group, topic) var gp = json_stream.map(_._2).map(parser) gp.saveAsTextFiles("/tmp/sparkstreaming/mytest", "json") ssc.start() }
When doing this, the following message appears. Therefore, I am sure that he does not just not see the message, because the offset is set.
14/12/05 13:34:08 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1417808045047] Added collector for ArrayBuffer sections ([[testtopic, 0], initOffset -1 for broker id: 1, host: test-spark02.vpc, port: 9092], [[testtopic, 1], initOffset -1 for the broker I d: 1, host: test-spark02.vpc, port: 9092], [[testtopic, 2], initOffset -1 for the broker id: 1, host : test-spark02.vpc, port: 9092], [[testtopic, 3], initOffset -1 for broker ID: 1, host: test-spark02.vpc, port: 9092], [[testtopic, 4], initOffset - 1 for broker id: 1, host: test-spark02.vpc, port: 9092])
Then, if I fill out 1000 new messages, I can see these 1000 messages stored in my temporary directory. But I do not know how to read existing messages, which should indicate in (at the moment) tens of thousands.