Spring Integration Kafka Consumer Listener does not receive messages

In accordance with the documentation presented here , I am trying on the POC to receive messages in the listener, as indicated in the same documentation . The following describes how I wrote the configuration.

@Configuration public class KafkaConsumerConfig { public static final String TEST_TOPIC_ID = "record-stream"; @Value("${kafka.topic:" + TEST_TOPIC_ID + "}") private String topic; @Value("${kafka.address:localhost:9092}") private String brokerAddress; /* @Bean public KafkaMessageDrivenChannelAdapter<String, String> adapter( KafkaMessageListenerContainer<String, String> container) { KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>( container, ListenerMode.record); kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return kafkaMessageDrivenChannelAdapter; } @Bean public QueueChannel received() { return new QueueChannel(); } */ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(30000); return factory; } /* * @Bean public KafkaMessageListenerContainer<String, String> container() * throws Exception { ContainerProperties properties = new * ContainerProperties(this.topic); // set more properties return new * KafkaMessageListenerContainer<>(consumerFactory(), properties); } */ @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); // props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest // smallest props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } } 

and listener as below

 @Service public class Listener { private Logger log = Logger.getLogger(Listener.class); @KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID, containerFactory = "kafkaListenerContainerFactory") public void process(String message/* , Acknowledgment ack */) { Gson gson = new Gson(); Record record = gson.fromJson(message, Record.class); log.info(record.getId() + " " + record.getName()); // ack.acknowledge(); } } 

Although I am creating posts for the same topic, and this consumer is working on the same topic, Listener does not.

I am running Kafka 0.10.0.1, and here is my current pom. This consumer acts as a spring boot web application, unlike many command line samples.

  <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>2.1.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-java-dsl</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> 

I spent a lot of time to understand why this listener does not get hit when there are messages in the subject that I am doing wrong.

I know that I can receive messages using the channel (I commented on the configuration part of this code), but here concurrency is the descriptor clearing.

Is such an implementation possible using an asynchronous message?

+5
source share
1 answer

You must add @EnableKafka along with @Configuration .

Will add some description soon.

Topics:

 @Configuration @EnableKafka public class KafkaConsumerConfig { 
+4
source

Source: https://habr.com/ru/post/1257817/


All Articles