We can subscribe to several topics using the following API: consumer.subscribe (Arrays.asList (topic1, topic2), ConsumerRebalanceListener obj)
The consumer has information about the topic, and we can use it with consumer.commitAsync or consumer.commitSync () by creating an OffsetAndMetadata object as follows.
ConsumerRecords<String, String> records = consumer.poll(long value);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
source
share