Kafka Consumer Path Should Not End / character

I am using Apache Kafka 0.8.2.1 to stream web events to other data sources. The Kafka producer that I wrote works fine, and I can see how the data is transmitted on my topic when I run kafka-console-consumer.sh. However, I was unlucky in trying to get my Kafka user to receive messages. Any ideas?

The following error about the wrong path is output when my code tries to run consumer.createMessageStreams (topicCountMap)

Exception in thread "main" java.lang.IllegalArgumentException: Path must not end with / character
        at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:58)
        at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1024)
        at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1073)
        at org.I0Itec.zkclient.ZkConnection.exists(ZkConnection.java:95)
        at org.I0Itec.zkclient.ZkClient$11.call(ZkClient.java:827)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:824)
        at org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:136)
        at kafka.consumer.ZookeeperConsumerConnector$$anonfun$kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer$4.apply(ZookeeperConsume
rConnector.scala:901)
        at kafka.consumer.ZookeeperConsumerConnector$$anonfun$kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer$4.apply(ZookeeperConsume
rConnector.scala:898)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:
898)
        at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:240)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)

Here is my Kafka consumer code.

  val consumer: ConsumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig())

  var executor: ExecutorService = null

  def run(a_numThreads: Integer) {
    var topicCountMap: java.util.Map[String, Integer] = new java.util.HashMap[String, Integer]()

    topicCountMap.put("testEvent", new Integer(a_numThreads))

    var consumerMap = consumer.createMessageStreams(topicCountMap)

    var streams = consumerMap.get("testEvent")
    // now launch all the threads
    executor = Executors.newFixedThreadPool(a_numThreads)

    // now create an object to consume the messages
    //
    var threadNumber: Integer = 0
    var streamsItr = streams.iterator()
    while (streamsItr.hasNext()) {
      var stream = streamsItr.next()
      executor.submit(new EventConsumer(stream, threadNumber))
      threadNumber = threadNumber + 1
    }
  }

  def createConsumerConfig(): ConsumerConfig = {
    var props: Properties = new Properties()
    props.put("zookeeper.connect", "127.0.0.1:2181")
    props.put("zk.connect", "127.0.0.1:2181")
    props.put("group.id", "testConsumer")
    props.put("groupid", "tesConsumer")
    props.put("zookeeper.session.timeout.ms", "400")
    props.put("zookeeper.sync.time.ms", "200")
    props.put("auto.commit.interval.ms", "1000")

    return new ConsumerConfig(props)
  }
+4
source share
1 answer

Spark CheckpointWriter , . , Checkpointing .

org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:58)

, , .

https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing

+1

Source: https://habr.com/ru/post/1584139/


All Articles