A staffed topic has two parts:
1) The cleaned part: a part of the kafa cardboard is cleaned by the kafa cleaner at least once.
2) Dirty portion: a portion of the Kafka journal is not cleared by the kafka cleaner even once until now. Kafka supports dirty bias. All messages with an offset> = dirty offset refer to the dirty part.
Note. Kafka Cleaner cleans all segments (regardless of whether the segment is in the cleaned / dirty part) and re-copies them every time the contaminated ratio reaches min.cleanable.dirty.ratio.
The graves are removed by the segment wise. Headstones in a segment are removed if the segment meets the conditions below:
The segment should be in the cleared part of the log.
The last modified segment time should be <= (The last modified segment time containing a message with offset = (dirty offset - 1)) - delete.retention.ms.
It is difficult to develop a second point, but in simple terms. The second dot means:> The size of the segment should be log.segment.bytes / segment.bytes (default is 1 GB). For a segment size (in the cleaner part) of 1 GB, you need to create a large number of messages with distinguishing keys. But you only created 4 messages with 3 messages having the same key. This is why the tombstones are not deleted in the segment containing the 1111: null message (the segment does not satisfy the second point I mentioned above).
You have two options for removing tombstones with 4 posts:
- do delete.retention.ms = 0 or
- make log.segment.bytes / segment.bytes = 50.
Source code (additional reading): https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogCleaner.scala
try { // clean segments into the new destination segment for (old <- segments) { val retainDeletes = old.lastModified > deleteHorizonMs info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes." .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats) }
source share