Kafka consumer "Failed to add leader for partitions" error on Mesos

I am running a Kafka cluster of 6 brokers using the mesos / kafka library . I can add and run brokers across 6 different machines and post messages to the cluster using Python SimpleProducer and kafka-console-producer.sh script.

However, I cannot get consumers to work properly. I run the following user command:

bin/kafka-console-consumer.sh --zookeeper 192.168.1.199:2181 --topic test --from-beginning --consumer.config config/consumer.properties --delete-consumer-offsets

In the consumer.properties file, I set group.id to my.groupand set zookeeeper.connectto the number of nodes in the zookeeper ensemble. I get the following warnings from starting this user:

            [2015-09-24 16:01:06,609] WARN [my.group_my_host-1443106865779-b5a3a1e1-leader-finder-thread], Failed to add l
    eader for partitions [test,4],[test,1],[test,5],[test,2],[test,0],[test,3]; will retry (kafka.consumer.ConsumerFetcherM
    anager$LeaderFinderThread)
    java.nio.channels.ClosedChannelException
            at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
            at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
            at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
            at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
            at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:166)
            at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
            at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:177)
            at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:172)
            at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
            at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
            at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
            at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:172)
            at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:87)
            at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:77)
            at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
            at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
            at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
            at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
            at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:77)
            at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
    {'some':2}
    [2015-09-24 16:20:02,362] WARN [my.group_my_host-1443108001180-fa0c93e4-leader-finder-thread], Failed to add leader for partitions [test,4],[test,1],[test,5],[test,2],[test,0],[test,3]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
    java.nio.channels.ClosedChannelException
            at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
            at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
            at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
            at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
            at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:166)
            at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
            at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:177)
            at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:172)
            at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
            at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
            at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
            at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:172)
            at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:87)
            at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:77)
            at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
            at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
            at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
            at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
            at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:77)
            at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
    ...
    // Lots more of this
    ...
    Consumed 1 messages

, , , , Zookeeper. , . {'some':2} - , .

server.log Mesos, , :

[2015-09-24 17:09:41,926] ERROR Closing socket for /192.168.1.199 because of error (kafka.network.Processor)
java.io.IOException: Broken pipe
            at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
            at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
            at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
            at sun.nio.ch.IOUtil.write(IOUtil.java:65)
            at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
            at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
            at kafka.network.MultiSend.writeTo(Transmission.scala:101)
            at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
            at kafka.network.Processor.write(SocketServer.scala:472)
            at kafka.network.Processor.run(SocketServer.scala:342)
            at java.lang.Thread.run(Thread.java:745)

, ?

Zookeeper :

[zk: localhost:2181(CONNECTED) 166] get /brokers/topics/test/partitions/0/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}

: Ubuntu 14.0.4 : 0,23 : 2.10-0.8.2.1

:, kafka-console-consumer.sh, , , . , stdout. Python KafkaConsumer FailedPayloadsError.

+4
3

, advertised.host.name ". .
, IP BROKER.
, .

+4

:

bin/kafka-topics.sh --zookeeper your.zookeeper:2181 --describe --topic your_topic

, ( . : http://kafka.apache.org/documentation.html#quickstart_multibroker)

, , . , - .

:

describe () , .

, . ( , ). , .

, .

+1

My problem:

  • Run zookeeper
  • Created Theme
  • Run kafka

Then I get โ€œNo Leader Exceptionโ€

But when I created the theme, when Zookeeper and Kafka worked fine, it worked fine.

0
source

Source: https://habr.com/ru/post/1608851/


All Articles