Kafka partition key does not work properly

I am trying to use the partition key mechanism correctly. My logic sets the partition number to 3, then create three partition keys as "0", "1", "2", then use the partition keys to create three KeyedMessage, for example

  • KeyedMessage (subject, "0", message)
  • KeyedMessage (subject, "1", message)
  • KeyedMessage (subject, "2", message)

After that, create a manufacturer instance to send all KeyedMessage.

I expect each KeyedMessage to go into different partitions according to different partition keys, which means

  • KeyedMessage (subject, "0", message) go to section 0
  • KeyedMessage (subject, "1", message) go to section 1
  • KeyedMessage (subject, "2", message) go to section 2

I use the Kafka-web console to see the status of the topic, but the result is not what I expect. KeyedMessage still moves to partitions randomly, sometimes two KeyedMessage will go into the same partition, even if they have different partition keys.

To make my question clearer, I would like to publish some Scala codes that I have, and I use Kafka 0.8.2-beta and Scala 2.10.4 .

Here are the manufacturer codes, I have not used a custom separator. :

val props = new Properties() val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec props.put("compression.codec", codec.toString) props.put("producer.type", if(synchronously) "sync" else "async") props.put("metadata.broker.list", brokerList) props.put("batch.num.messages", batchSize.toString) props.put("message.send.max.retries", messageSendMaxRetries.toString) props.put("request.required.acks",requestRequiredAcks.toString) props.put("client.id",clientId.toString) val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props)) def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = { if (partition == null) { new KeyedMessage(topic,message) } else { new KeyedMessage(topic,partition,message) } } def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8")) def send(message: Array[Byte], partition: Array[Byte]): Unit = { try { producer.send(kafkaMesssage(message, partition)) } catch { case e: Exception => e.printStackTrace System.exit(1) } } 

And this is how I use the manufacturer, create an instance of the manufacturer, and then use this instance to send three messages. I am currently creating a section key as an Integer and then converting it to Byte Arrays:

  val testMessage = UUID.randomUUID().toString val testTopic = "sample1" val groupId_1 = "testGroup" print("starting sample broker testing") val producer = new KafkaProducer(testTopic, "localhost:9092") val numList = List(0,1,2); for (a <- numList) { // Create a partition key as Byte Array var key = java.nio.ByteBuffer.allocate(4).putInt(a).array() //Here I give a Array[Byte] key //so the second "send" function of producer will be called producer.send(testMessage.getBytes("UTF8"), key) } 

Not sure if my logic is wrong or if I misunderstood the partition key mechanism. Anyone can provide an example code or explanation, it would be great!

+5
source share
3 answers

Had the same problem - just switch to ByteArrayParitioner:

 props.put("partitioner.class", "kafka.producer.ByteArrayPartitioner") 
+1
source

People often think that splitting is a way of splitting business data into business categories, but this is not the right viewing angle for a section.

Separation directly affects these subjects:

-performance (each section can be used in parallel with other sections)

- message order (message order guaranteed only at the section level)

I will give an example of creating partitions:

You have a theme, say MyMessagesToWorld

You want to transfer this topic (all MyMessagesToWorld) to some of the users.

You "weigh" the entire "mass" of MyMessagesToWorld and discover that it is 10 kg.

MyMessagesToWorld has the following "business categories":

- messages for dad (D)

masses to mom (M)

- messages sis (S)

Values ​​for Grandma (G)

teacher values ​​(T)

Masses To A Friend (F)

You think who your consumers are, and you find that your consumers are gnomes that can consume 1 Kg message in an hour each.

You can use up to 2 of these gnomes.

1 gnome needs 10 hours to consume 10 kg of messages, 2 gnomes need 5 hours.

So, you decide to use all the available gnomes to save time.

To create 2 "channels" for these 2 gnomes, you create 2 sections of this topic on Kafka. If you have specified more dwarves, create more sections.

You have 6 business categories inside and 2 consecutive independent consumers - gnomes (consumer flows).

What to do?

Kafka's approach is as follows:

Suppose you have 2 kafka instances in a cluster. (Same example if you have more clusters in the cluster)

You set the partition number to 2 using the Kafka example (for example, use the Kafka 0.8.2.1 example):

You define your topic in Kafka, saying that you have 2 sections for this topic:

 kafka-topics.sh(.bat) --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic MyMessagesToWorld 

Now in the MyMessagesToWorld theme there are 2 sections: P (0) and P (1).

You have chosen number 2 (sections) because you know you have (only 2) gnomes.

You can add more sections later when more gnomes are consumed.

Do not confuse a Kafka consumer with such a gnome.

A Kafka consumer can use N gnomes. (N parallel threads)

Now you are creating a KEY for your posts.

You need KEYS to distribute your messages among sections.

The keys will be the following letters of the "business categories" that you defined earlier: D, M, S, G, T, F, you think that such letters are in order to be ID.

But in the general case, everything that can be used as a key: (complex objects and byte arrays, that's all ...)

If you create an NO sectionist, the default value is used.

The default delimiter is a little dumb.

It takes the hashcode of each KEY and divides it by the number of sections available, a β€œreminder” will ignore the number of sections for this key.

Example:

 KEY M, hashcode=12345, partition for M = 12345 % 2 = 1 

As you can imagine, with such a separator, at best you have 3 business categories landing in each section.

In the worst case scenario, you can place all business categories in 1 section.

If you had 100,000 business categories, it is statistically normal to distribute them according to this algorithm.

But with only a few categories you can have a not-so-fair distribution.

So you can rewrite the separator and distribute your business categories a little wiser.

There is an example:

This splitter divides the business categories equally among the available partitions.

 public class CustomPartitioner { private static Map<String, Integer> keyDistributionTable = new HashMap<String, Integer>(); private static AtomicInteger sequence = new AtomicInteger(); private ReentrantLock lock = new ReentrantLock(); public int partition(ProducerRecord<String, Object> record, Cluster cluster) { String key = record.key(); int seq = figureSeq(key); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic()); if (availablePartitions.size() > 0) { int part = seq % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic()); int numPartitions = partitions.size(); // no partitions are available, give a non-available partition return seq % numPartitions; } } private int figureSeq(String key) { int sequentualNumber = 0; if(keyDistributionTable.containsKey(key)){ sequentualNumber = keyDistributionTable.get(key); }else{//synchronized region //used only for new Keys, so high waiting time for monitor expected only on start lock.lock(); try{ if(keyDistributionTable.containsKey(key)){ sequentualNumber = keyDistributionTable.get(key); }else{ int seq = sequence.incrementAndGet(); keyDistributionTable.put(key, seq); sequentualNumber = seq; } }finally{ lock.unlock(); } } return sequentualNumber; } 

}

+9
source

The delimiter, by default, looks at the key (as an array of bytes) and uses (% numPartitions) to convert this value to an integer value between 0 and the number of partitions-1 inclusive. The resulting integer determines the section on which the message is written, and not the key value, as you do.

+2
source

Source: https://habr.com/ru/post/1208633/


All Articles