Kafka does not remove the key with a tombstone

I am creating a kafka theme with the underlying properties

min.cleanable.dirty.ratio = 0.01, delete.retention.ms = 100, segment.ms = 100, cleanup.policy = compact

Let's say I insert kv pairs in the order 1111: 1, 1111: 2, 1111: null, 2222: 1 What happens now, except for the last message, the compilation of the log is performed in the rest of the messages and clears the first two, but saves 1111: null

Compliant with documentation

Kafka log compaction also allows for deletes. A message with a key and a null payload acts like a tombstone, a delete marker for that key. Tombstones get cleared after a period. 

So, I hope when the removal of .retention.ms is reached, a null marker should delete the message with key 1111

I have two questions: why does the headstone marker not work? Why is the last message ignored from the compaction?

Here is what server.properties file is -

 log.retention.ms=100 log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=100 log.cleaner.delete.retention.ms=100 log.cleaner.enable=true log.cleaner.min.cleanable.ratio=0.01 
+5
source share
3 answers

Tombstone records last longer by design. The reason is because brokers do not track consumers. Suppose a consumer reads the first record for some time. While the consumer is not working, the magazine shrinks strokes. If a log failure leads to the deletion of a tombstone record, the consumer will never know that the record has been deleted. If a consumer implements a cache, it may happen that the record is never deleted. Thus, the tombstone lasts longer to allow the stand-alone consumer to receive all the tombstones for local cleaning.

The tombstone will only be deleted after delete.retention.ms (the default value is 1 day). Note. This is a theme level configuration, and there is no broker level configuration for it. Thus, you need to set the configuration for each theme if you want to change it.

+3
source

The algorithm for removing the headstone in a compacted form should be as follows.

  • The tombstone is never removed when it is still in the dirty part of the log.
  • After the headstone is in the cleaned part of the log, we also delay the removal of the headstone with delete.retention.ms from the moment the headstone is in the cleaned part.

It is possible that tombstones are still in the dirty part of the log and therefore are not cleaned. Running multiple messages from different keys should push the tombstones into the cleared part of the log and delete them.

+1
source

A staffed topic has two parts:

1) The cleaned part: a part of the kafa cardboard is cleaned by the kafa cleaner at least once.

2) Dirty portion: a portion of the Kafka journal is not cleared by the kafka cleaner even once until now. Kafka supports dirty bias. All messages with an offset> = dirty offset refer to the dirty part.

Note. Kafka Cleaner cleans all segments (regardless of whether the segment is in the cleaned / dirty part) and re-copies them every time the contaminated ratio reaches min.cleanable.dirty.ratio.

The graves are removed by the segment wise. Headstones in a segment are removed if the segment meets the conditions below:

  • The segment should be in the cleared part of the log.

  • The last modified segment time should be <= (The last modified segment time containing a message with offset = (dirty offset - 1)) - delete.retention.ms.

It is difficult to develop a second point, but in simple terms. The second dot means:> The size of the segment should be log.segment.bytes / segment.bytes (default is 1 GB). For a segment size (in the cleaner part) of 1 GB, you need to create a large number of messages with distinguishing keys. But you only created 4 messages with 3 messages having the same key. This is why the tombstones are not deleted in the segment containing the 1111: null message (the segment does not satisfy the second point I mentioned above).

You have two options for removing tombstones with 4 posts:

  • do delete.retention.ms = 0 or
  • make log.segment.bytes / segment.bytes = 50.

Source code (additional reading): https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogCleaner.scala

 try { // clean segments into the new destination segment for (old <- segments) { val retainDeletes = old.lastModified > deleteHorizonMs info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes." .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats) } 
0
source

Source: https://habr.com/ru/post/1272439/


All Articles