KafkaConsumer 0.10 Java API Error Message: No Current Destination for Section

I am using KafkaConsumer 0.10 Java api. I want to consume from a specific section and a specific offset. I looked up and found that there was a search method, but it throws an exception. Has anyone had a similar use case or solution?

code:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps); consumer.seek(new TopicPartition("mytopic", 1), 4); 

An exception

 java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) at xx.xxx.xxx.Test.main(Test.java:182) 
+5
source share
2 answers

Before you can seek() , you first need to subscribe() to go to the section or assign() of the topic section for the consumer. Also keep in mind that subscribe() and assign() lazy - so you also need to make a β€œdummy call” before poll() before you can use seek() .

If you use subscribe() , you use group control: in this way, you can start several users using the same group.id , and all sections of the section will be distributed evenly among all users in the group automatically (each section will get assigned to one to the consumer in the group).

If you want to read specific sections, you need to use manual assignment via assign() . This allows you to complete any task you want.

Btw: KafkaConsumer has a very long detailed JavaDoc class, including examples. Worth reading it.

+18
source

If you do not want to use poll () and retrieve map entries, as well as change the offset itself. Kafka Version 0.11 Try the following:

 ... props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("Test_topic1", "Test_topic2")); List<TopicPartition> partitions =consumer.partitionsFor("Test_topic1").stream().map(part->{TopicPartition tp = new TopicPartition(part.topic(),part.partition()); return tp;}).collect(Collectors.toList()); Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); coordinatorField.setAccessible(true); ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer); coordinator.poll(new Date().getTime(), 1000);//Watch out for your local date and time settings consumer.seekToBeginning(partitions); //or other seek 

Poll for coordinator events. This ensures that the coordinator is known and that the customer has joined the group (if he uses group management). It also handles periodic offsets, if included.

+1
source

Source: https://habr.com/ru/post/1260911/


All Articles