Messages lost when restarting Kafka hosts

I am running a Kafka 3 node cluster on AWS.

Kafka Version: 0.10.2.1
Zookeeper Version: 3.4

When performing several stability tests, I noticed that messages get lost when I take the node leader down.

These are the steps to reproduce the problem:

Create a topic with a replication factor of 3, which should make the data available for all three nodes .:

~ $ docker run --rm -ti ches/kafka bin/kafka-topics.sh --zookeeper "10.2.31.10:2181,10.2.31.74:2181,10.2.31.138:2181" --create --topic stackoverflow --replication-factor 3 --partitions 20
Created topic "stackoverflow".
~ $ docker run --rm -ti ches/kafka bin/kafka-topics.sh --zookeeper "10.2.31.10:2181,10.2.31.74:2181,10.2.31.138:2181" --describe --topic stackoverflow
Topic:stackoverflow    PartitionCount:20    ReplicationFactor:3    Configs:
    Topic: stackoverflow    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 1    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 2    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 3    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 4    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 5    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 6    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 7    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 8    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 9    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 10    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 11    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 12    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 13    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 14    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 15    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 16    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 17    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 18    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 19    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1

Start creating this theme with the following code:

import time
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['10.2.31.10:9092' ,'10.2.31.74:9092' ,'10.2.31.138:9092'])

try:
    count = 0
    while True:
        producer.send('stackoverflow', 'message')
        producer.flush()
        count += 1
        time.sleep(1)
except KeyboardInterrupt:
    print "Sent %s messages." % count

At this moment, I kill one of the cars and wait until she returns to the cluster.

When he returns, I will stop the producer and destroy all messages from this topic.

from kafka import KafkaConsumer

consumer = KafkaConsumer('stackoverflow',
                            bootstrap_servers=['10.2.31.10:9092' ,'10.2.31.74:9092' ,'10.2.31.138:9092'],
                            auto_offset_reset='earliest',
                            enable_auto_commit=False)
try:
    count = 0
    for message in consumer:
        count += 1
        print message
except KeyboardInterrupt:
    print "Received %s messages." % count

Two sent messages are missing. The manufacturer did not respond to errors.

kafka $ python producer.py
Sent 762 messages.

kafka $ python consumer.py
Received 760 messages.

Kafka, . .

!

+4
2

, . , , "retries".

python, :

producer = KafkaProducer(bootstrap_servers=[...], retries=sys.maxint)

, , .

0

. : flush() , , .

:

  • unclean.leader.election.enabled ( , true kafka < 0,11 false kafka >= 0.11, false 0.10.2)
  • ( ) : producer.send(...).get()
  • retries=5 KafkaProducer init ( ).

, .

+2

Source: https://habr.com/ru/post/1685471/


All Articles