Spring -integration-kafka config consumer to receive a message from the specified section

I started using spring-integration-kafka in my project, and I can produce and consume messages from Kafka. But now I want to create a message for a specific section and also use a message from a specific section.

Example: I want to create a message for section 3, and consumption will only receive the message from section 3.

So far, my topic has had 8 sections, and I can create a message for a specific section, but I have not yet found a way to configure for the user to only receive a message from a specific section.

So, any suggestion on how I can configure a user using spring-integration-kafka, or something else that needs to be done with the KafkaConsumer.java class, can get a message from a specific section.

Thanks.

Here is my code:

kafka-manufacturer-context.xml

<int:publish-subscribe-channel id="inputToKafka" /> <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="kafkaProducerContext" auto-startup="true" order="1" channel="inputToKafka" /> <int-kafka:producer-context id="kafkaProducerContext" producer-properties="producerProps"> <int-kafka:producer-configurations> <int-kafka:producer-configuration broker-list="127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094" async="true" topic="testTopic" key-class-type="java.lang.String" key-encoder="encoder" value-class-type="java.lang.String" value-encoder="encoder" partitioner="partitioner" compression-codec="default" /> </int-kafka:producer-configurations> </int-kafka:producer-context> <util:properties id="producerProps"> <prop key="queue.buffering.max.ms">500</prop> <prop key="topic.metadata.refresh.interval.ms">3600000</prop> <prop key="queue.buffering.max.messages">10000</prop> <prop key="retry.backoff.ms">100</prop> <prop key="message.send.max.retries">2</prop> <prop key="send.buffer.bytes">5242880</prop> <prop key="socket.request.max.bytes">104857600</prop> <prop key="socket.receive.buffer.bytes">1048576</prop> <prop key="socket.send.buffer.bytes">1048576</prop> <prop key="request.required.acks">1</prop> </util:properties> <bean id="encoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder" /> <bean id="partitioner" class="org.springframework.integration.kafka.support.DefaultPartitioner"/> <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500" /> 

KafkaProducer.java

 public class KafkaProducer { private static final Logger logger = LoggerFactory .getLogger(KafkaProducer.class); @Autowired private MessageChannel inputToKafka; public void sendMessage(String message) { try { inputToKafka.send(MessageBuilder.withPayload(message) .setHeader(KafkaHeaders.TOPIC, "testTopic") .setHeader(KafkaHeaders.PARTITION_ID, 3).build()); } catch (Exception e) { logger.error(String.format( "Failed to send [ %s ] to topic %s ", message, topic), e); } } 

}

kafka-consumer-context.xml

 <int:channel id="inputFromKafka"> <int:dispatcher task-executor="kafkaMessageExecutor" /> </int:channel> <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="127.0.0.1:2181" zk-connection-timeout="6000" zk-session-timeout="6000" zk-sync-time="2000" /> <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" auto-startup="true" channel="inputFromKafka"> <int:poller fixed-delay="10" time-unit="MILLISECONDS" max-messages-per-poll="5" /> </int-kafka:inbound-channel-adapter> <bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="auto.offset.reset">smallest</prop> <prop key="socket.receive.buffer.bytes">1048576</prop> <prop key="fetch.message.max.bytes">5242880</prop> <prop key="auto.commit.interval.ms">1000</prop> </props> </property> </bean> <int-kafka:consumer-context id="consumerContext" consumer-timeout="1000" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties"> <int-kafka:consumer-configurations> <int-kafka:consumer-configuration group-id="defaultGrp" max-messages="20000"> <int-kafka:topic id="testTopic" streams="3" /> </int-kafka:consumer-configuration> </int-kafka:consumer-configurations> </int-kafka:consumer-context> <task:executor id="kafkaMessageExecutor" pool-size="0-10" keep-alive="120" queue-capacity="500" /> <int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaConsumer" method="processMessage" /> 

KafkaConsumer.java

 public class KafkaConsumer { private static final Logger log = LoggerFactory .getLogger(KafkaConsumer.class); @Autowired KafkaService kafkaService; public void processMessage(Map<String, Map<Integer, List<byte[]>>> msgs) { for (Map.Entry<String, Map<Integer, List<byte[]>>> entry : msgs .entrySet()) { log.debug("Topic:" + entry.getKey()); ConcurrentHashMap<Integer, List<byte[]>> messages = (ConcurrentHashMap<Integer, List<byte[]>>) entry .getValue(); log.debug("\n**** Partition: \n"); Set<Integer> keys = messages.keySet(); for (Integer i : keys) log.debug("p:"+i); log.debug("\n**************\n"); Collection<List<byte[]>> values = messages.values(); for (Iterator<List<byte[]>> iterator = values.iterator(); iterator .hasNext();) { List<byte[]> list = iterator.next(); for (byte[] object : list) { String message = new String(object); log.debug("Message: " + message); try { kafkaService.receiveMessage(message); } catch (Exception e) { log.error(String.format("Failed to process message %s", message)); } } } } } } 

So my problem is here. When I create a message for section 3 or any section, KafkaConsumer always receives the message. All I want: KafkaConsumer will receive a message only from section 3, and not from another section.

Thanks again.

+6
source share
1 answer

You need to use message-driven-channel-adapter .

Alternatively, the KafkaMessageListenerContainer may accept the argument org.springframework.integration.kafka.core.Partition of the array to indicate the topics and their pair of sections.

You need to connect the listener container using this constructor and provide it to the adapter using the listener-container attribute.

We will update readme with an example.

+1
source

Source: https://habr.com/ru/post/989922/


All Articles