How to create messages for a selected partition using kafka-console-production?

According to the Kafka documentation:

The manufacturer is responsible for choosing which message to assign to a particular section in the topic.

How can I send messages to the selected section using kafka-console-producer.sh ?

I would like to specify some kind of "section identifier" when sending a message.

+13
source share
4 answers

Update: This answer was correct in 2014, but more modern versions of Kafka can create key / value pairs through the console manufacturer. See the answers below.

kafka-console-producer.sh does not support creating messages for a specific section out of the box.

However, it should be quite simple to update the script to pass an additional parameter for the section identifier, and then process it in a custom separator, as described in the @Chiron post in a modified version of the kafka.tools.ConsoleProducer class.

Take a look at the source code at:

https://apache.googlesource.com/kafka/+/refs/heads/trunk/bin/kafka-console-producer.sh https://apache.googlesource.com/kafka/+/refs/heads/trunk/core /src/main/scala/kafka/tools/ConsoleProducer.scala

+1
source

ConsoleProducer currently seems to support writing key messages to a topic. Kafka will use a key hash to propagate the message to sections, at least with default behavior.

Currently, the default separator is \t , so typing key[\t]message will distribute it among the sections:

 key1 a-message 

You can change the separator by providing the key.separator configuration, for example:

 kafka-console-producer --broker-list localhost:9092,localhost:9093 \ --topic mytopic --property key.separator=, 

Send such messages:

 key2,another-message 

I tested this with a standard tab and a custom seperator successfully. Messages were distributed between two separate sections.

+16
source

According to the current state of things (Kafka> = 0.10.0.1), the kafka-console-producer.sh script and the java base class of the ConsoleProducer class support sending data using the key, but this support is disabled by default and must be enabled from the CLI.

Namely, you need to set the parse.key property. Also, if you want to use something other than a tab character, use key.separator as indicated in Cedric's answer.

In the end, the command line will be:

 kafka-console.producer.sh --broker-list kafka:9092,kafka2:9092 \ --topic $TOPIC --property parse.key=true --property key.separator=| 
+11
source

Here is your starting point:
partitioner.class in your Properties instance. In Kafka, the default implementation is kafka.producer.DefaultPartitioner .

The purpose of this setting is:

A separator class for separating messages between subtopics. The default delimiter is based on a key hash.

This means that if you want to change the default separator behavior, you need to create your own implementation of the kafka.producer.Partitioner interface.

I would advise you to be very careful when creating your own strategy and really, test a lot and control your topics and their sections.

+6
source

Source: https://habr.com/ru/post/977198/


All Articles