People often think that splitting is a way of splitting business data into business categories, but this is not the right viewing angle for a section.
Separation directly affects these subjects:
-performance (each section can be used in parallel with other sections)
- message order (message order guaranteed only at the section level)
I will give an example of creating partitions:
You have a theme, say MyMessagesToWorld
You want to transfer this topic (all MyMessagesToWorld) to some of the users.
You "weigh" the entire "mass" of MyMessagesToWorld and discover that it is 10 kg.
MyMessagesToWorld has the following "business categories":
- messages for dad (D)
masses to mom (M)
- messages sis (S)
Values ββfor Grandma (G)
teacher values ββ(T)
Masses To A Friend (F)
You think who your consumers are, and you find that your consumers are gnomes that can consume 1 Kg message in an hour each.
You can use up to 2 of these gnomes.
1 gnome needs 10 hours to consume 10 kg of messages, 2 gnomes need 5 hours.
So, you decide to use all the available gnomes to save time.
To create 2 "channels" for these 2 gnomes, you create 2 sections of this topic on Kafka. If you have specified more dwarves, create more sections.
You have 6 business categories inside and 2 consecutive independent consumers - gnomes (consumer flows).
What to do?
Kafka's approach is as follows:
Suppose you have 2 kafka instances in a cluster. (Same example if you have more clusters in the cluster)
You set the partition number to 2 using the Kafka example (for example, use the Kafka 0.8.2.1 example):
You define your topic in Kafka, saying that you have 2 sections for this topic:
kafka-topics.sh(.bat) --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic MyMessagesToWorld
Now in the MyMessagesToWorld theme there are 2 sections: P (0) and P (1).
You have chosen number 2 (sections) because you know you have (only 2) gnomes.
You can add more sections later when more gnomes are consumed.
Do not confuse a Kafka consumer with such a gnome.
A Kafka consumer can use N gnomes. (N parallel threads)
Now you are creating a KEY for your posts.
You need KEYS to distribute your messages among sections.
The keys will be the following letters of the "business categories" that you defined earlier: D, M, S, G, T, F, you think that such letters are in order to be ID.
But in the general case, everything that can be used as a key: (complex objects and byte arrays, that's all ...)
If you create an NO sectionist, the default value is used.
The default delimiter is a little dumb.
It takes the hashcode of each KEY and divides it by the number of sections available, a βreminderβ will ignore the number of sections for this key.
Example:
KEY M, hashcode=12345, partition for M = 12345 % 2 = 1
As you can imagine, with such a separator, at best you have 3 business categories landing in each section.
In the worst case scenario, you can place all business categories in 1 section.
If you had 100,000 business categories, it is statistically normal to distribute them according to this algorithm.
But with only a few categories you can have a not-so-fair distribution.
So you can rewrite the separator and distribute your business categories a little wiser.
There is an example:
This splitter divides the business categories equally among the available partitions.
public class CustomPartitioner { private static Map<String, Integer> keyDistributionTable = new HashMap<String, Integer>(); private static AtomicInteger sequence = new AtomicInteger(); private ReentrantLock lock = new ReentrantLock(); public int partition(ProducerRecord<String, Object> record, Cluster cluster) { String key = record.key(); int seq = figureSeq(key); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic()); if (availablePartitions.size() > 0) { int part = seq % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic()); int numPartitions = partitions.size(); // no partitions are available, give a non-available partition return seq % numPartitions; } } private int figureSeq(String key) { int sequentualNumber = 0; if(keyDistributionTable.containsKey(key)){ sequentualNumber = keyDistributionTable.get(key); }else{//synchronized region //used only for new Keys, so high waiting time for monitor expected only on start lock.lock(); try{ if(keyDistributionTable.containsKey(key)){ sequentualNumber = keyDistributionTable.get(key); }else{ int seq = sequence.incrementAndGet(); keyDistributionTable.put(key, seq); sequentualNumber = seq; } }finally{ lock.unlock(); } } return sequentualNumber; }
}