Understanding Consumer Group Identifier

I did a fresh install of Apache Kafka 0.10.1.0.

I was able to send / receive messages on the command line.

Using the Java Producer / Consumer example, I cannot recognize the group.id parameter in the user example.

Let me know how to fix this problem.

The following is an example of a consumer that I used:

public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-topic"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); try { consumer.subscribe(Arrays.asList("my-topic")); ConsumerRecords<String, String> records = consumer.poll(100); System.err.println("records size=>"+records.count()); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } catch (Exception ex){ ex.printStackTrace(); } finally { consumer.close(); } } 

After running the command for the user, I can see the messages (on the console) sent by the manufacturer. But could not see the messages from the java program

bin \ windows \ kafka-console-consumer.bat --bootstrap-server localhost: 9092 --topic my-topic --from-begin

+13
source share
6 answers

Consumers identify themselves with the name of a consumer group, and each entry published in a topic is delivered to one user instance in each subscribed consumer group. Consumer copies can be separate processes or on separate machines.

If all consumer instances have the same consumer group, then the records will be effectively balanced in terms of load over consumer instances.

If all consumer instances have different consumer groups, then each record will be transferred to all consumer processes.

The .id group is a string that uniquely identifies the group of consumer processes to which this consumer belongs.

( Kafka's entry )

+16
source

Assign an arbitrary value to the group identifier. It does not matter

 props.put("group.id", "Any Random Value"); 
+2
source

In the code you provided, you just wait for the data once per 100 ms. You should receive data in a loop or wait for a longer period of time (in this case you will receive only one piece of data). As for "group.id", this is the case when you start the user from the console, he gets a random "group.id".

0
source

Since no bias was provided, the java client will wait for new messages, but will not show existing messages - this is as expected. If you want to read all the messages already in the topic, you can use this piece of code:

 if (READ_FROM_BEGINNING) { //consume all the messages from the topic from the beginning. //this doesn't work reliably if it consumer.poll(..) is not called first //probably because of lazy-loading issues consumer.poll(10); consumer.seekToBeginning(consumer.assignment()); //if intending to //read from the beginning or call below to read from a predefined offset. //consumer.seek(consumer.assignment().iterator().next(), READ_FROM_OFFSET); } 
0
source

Below are some test results for the group.id section and consumer property.

  Properties props = new Properties(); //set all other properties as required props.put("group.id", "ConsumerGroup1"); props.put("max.poll.records", "1"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 

user.group id - load the balance of the received data (if group.id is different for each consumer, each consumer will receive a copy of the data)

if section = 1 and total number of consumers = 2, only one of the two active consumers will receive data

if section = 2 and the total number of consumers = 2, each of the two active consumers evenly receives data

if section = 3 and total number of consumers = 2, each of the two active consumers will receive data. one consumer receives data from 2 sections, and the other receives data from 1 section.

if section = 3 and the total number of consumers = 3, each of the three active consumers evenly receives data.

0
source

A consumer group is a consumer group that must be defined in the Kafka consumer.properties file.

Add the my-topic group to the consumer group, and it should work as follows:

 # consumer group id group.id=my-topic-consumer-group 
-1
source

Source: https://habr.com/ru/post/1262019/


All Articles