I am using Apache Kafka 0.8.2.1 to stream web events to other data sources. The Kafka producer that I wrote works fine, and I can see how the data is transmitted on my topic when I run kafka-console-consumer.sh. However, I was unlucky in trying to get my Kafka user to receive messages. Any ideas?
The following error about the wrong path is output when my code tries to run consumer.createMessageStreams (topicCountMap)
Exception in thread "main" java.lang.IllegalArgumentException: Path must not end with / character
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:58)
at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1024)
at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1073)
at org.I0Itec.zkclient.ZkConnection.exists(ZkConnection.java:95)
at org.I0Itec.zkclient.ZkClient$11.call(ZkClient.java:827)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:824)
at org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:136)
at kafka.consumer.ZookeeperConsumerConnector$$anonfun$kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer$4.apply(ZookeeperConsume
rConnector.scala:901)
at kafka.consumer.ZookeeperConsumerConnector$$anonfun$kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer$4.apply(ZookeeperConsume
rConnector.scala:898)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:
898)
at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:240)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)
Here is my Kafka consumer code.
val consumer: ConsumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig())
var executor: ExecutorService = null
def run(a_numThreads: Integer) {
var topicCountMap: java.util.Map[String, Integer] = new java.util.HashMap[String, Integer]()
topicCountMap.put("testEvent", new Integer(a_numThreads))
var consumerMap = consumer.createMessageStreams(topicCountMap)
var streams = consumerMap.get("testEvent")
executor = Executors.newFixedThreadPool(a_numThreads)
var threadNumber: Integer = 0
var streamsItr = streams.iterator()
while (streamsItr.hasNext()) {
var stream = streamsItr.next()
executor.submit(new EventConsumer(stream, threadNumber))
threadNumber = threadNumber + 1
}
}
def createConsumerConfig(): ConsumerConfig = {
var props: Properties = new Properties()
props.put("zookeeper.connect", "127.0.0.1:2181")
props.put("zk.connect", "127.0.0.1:2181")
props.put("group.id", "testConsumer")
props.put("groupid", "tesConsumer")
props.put("zookeeper.session.timeout.ms", "400")
props.put("zookeeper.sync.time.ms", "200")
props.put("auto.commit.interval.ms", "1000")
return new ConsumerConfig(props)
}
source
share