In accordance with the documentation presented here , I am trying on the POC to receive messages in the listener, as indicated in the same documentation . The following describes how I wrote the configuration.
@Configuration public class KafkaConsumerConfig { public static final String TEST_TOPIC_ID = "record-stream"; @Value("${kafka.topic:" + TEST_TOPIC_ID + "}") private String topic; @Value("${kafka.address:localhost:9092}") private String brokerAddress; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(30000); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
and listener as below
@Service public class Listener { private Logger log = Logger.getLogger(Listener.class); @KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID, containerFactory = "kafkaListenerContainerFactory") public void process(String message) { Gson gson = new Gson(); Record record = gson.fromJson(message, Record.class); log.info(record.getId() + " " + record.getName());
Although I am creating posts for the same topic, and this consumer is working on the same topic, Listener does not.
I am running Kafka 0.10.0.1, and here is my current pom. This consumer acts as a spring boot web application, unlike many command line samples.
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>2.1.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-java-dsl</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
I spent a lot of time to understand why this listener does not get hit when there are messages in the subject that I am doing wrong.
I know that I can receive messages using the channel (I commented on the configuration part of this code), but here concurrency is the descriptor clearing.
Is such an implementation possible using an asynchronous message?