Kafka Consumer Productivity Tuning / Performance Issues

I am trying to use kafka as an alternative to AWS SQS. The motivation in the first place is to improve performance when kafka removes the restriction of pulling 10 messages at a time with a cap of 256 kB. Here is a high level scenario of my use. I have a bunch of crawlers that send documents for indexing. The size of the payload is on average about 1 mb. The scanners call the SOAP endpoint, which in turn runs the manufacturer code to send messages to the kafka queue. The consumer application picks up the messages and processes them. For my test window, I configured a 30-topic theme with 2 replications. Two kafka instances work with 1 zookeeper instance. The kafka version is 0.10.0.

For my testing, I posted 7 million messages in the queue. I created a consumer group with 30 consumer flows, one per section. Initially, I had the impression that this would significantly speed up the processing power compared to what I got through SQS. Unfortunately, this was not so. In my case, data processing is complicated and takes an average of 1-2 minutes to complete. This leads to a sharp redistribution of partitions, since threads cannot beat on time. I could see a bunch of messages in the journal, quoting

An automatic bias error for the full_group: Commit group could not be completed because the group has already rebalanced and assigned transitions to another member. This means that the time between subsequent poll () calls was longer than the configured session.timeout.ms, which usually implies that the polling cycle spends too much time processing the messages. You can address this either by increasing the session timeout or by reducing the maximum batch size returned in the poll () from max.poll.records.

This leads to the fact that the same message is processed several times. I tried to play with the session timeout, max.poll.records and polling time to avoid this, but it slowed down the overall processing of the big time. Here are some configuration options.


metadata.max.age.ms = 300000
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
enable.auto.commit = true
max.poll.records = 10000
request.timeout.ms = 310000
heartbeat.interval.ms = 100000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 65536
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer
group.id = full_group
retry.backoff.ms = 100
fetch.max.wait.ms = 500
connections.max.idle.ms = 540000
session.timeout.ms = 300000
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
metrics.sample.window.ms = 30000
auto.offset.reset = latest
I have reduced the consumer polling time to 100 ms. This reduced rebalancing problems, eliminated duplicate processing, but significantly slowed down the overall process. It ended up taking 35 hours to process all 6 million messages, compared to 25 hours using the SQS solution. Each consumer stream received an average of 50-60 messages per survey, although some of them polled 0 records at times. I am not sure about this when there are a huge number of posts in the section. The same thread was able to receive messages during the subsequent iteration. Could this be due to rebalancing?

Here is my consumer code


while (true) {
    try{
        ConsumerRecords records = consumer.poll(100);
        for (ConsumerRecord record : records) {
            if(record.value()!=null){
                TextAnalysisRequest textAnalysisObj = record.value();
                if(textAnalysisObj!=null){
                    // Process record
                    PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
                }
            }
        }
    }catch(Exception ex){
        LOGGER.error("Error in Full Consumer group worker", ex);
    }
I understand that part of recording processing is one of the bottlenecks in my case. But I am sure that several people have a similar case of using a large processing time. I thought about doing asynchronous processing by unscrewing each processor in a dedicated thread or using a thread pool with high capacity, but I'm not sure if it will create a large load on the system. At the same time, I saw several examples where people used Pause and renew the API to execute processing in order to avoid a balancing problem.

. , , - , , , .., kafka , , .

+4
1

, , , , Kafka. , , Kafka . - :

    private final BlockingQueue<TextAnalysisRequest> requests = 
new LinkedBlockingQueue();

:

while (true) {
    try{
        ConsumerRecords records = consumer.poll(100);
        for (ConsumerRecord record : records) {
            if(record.value()!=null){
                TextAnalysisRequest textAnalysisObj = record.value();
                if(textAnalysisObj!=null){
                    // Process record
                    requests.offer(textAnalysisObj);
                }
            }
     }    
}
catch(Exception ex){
    LOGGER.error("Error in Full Consumer group worker", ex);
}

:

            while (!Thread.currentThread().isInterrupted()) {
                try {
                    TextAnalysisRequest textAnalysisObj = requests.take();
                    PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
                } catch (InterruptedException e) {
                    LOGGER.info("Process thread interrupted", e);
                    Thread.currentThread().interrupt();
                } catch (Throwable t) {
                    LOGGER.warn("Unexpected throwable while processing.", t);
                }
            }

, Kafka: http://blog.cloudera.com/blog/2015/07/deploying-apache-kafka-a-practical-faq/

, Kafka 10K, , Kafka .

+2

Source: https://habr.com/ru/post/1656892/


All Articles