Kafka-python - How do I commit a partition?

Using kafka-python-1.0.2.

If I have a topic with 10 sections, how can I commit a specific section by going through the various sections and messages. I just can't find an example of this anywhere, in docs or otherwise

In the docs I want to use:

consumer.commit(offset=offsets)

In particular, how do I create the OffsetAndMetadata dictionary and dictionary needed for the offset (dict, optional) - {TopicPartition: OffsetAndMetadata}.

I was hoping the function call would look something like this:

consumer.commit(partition, offset)

but this does not seem to be the case.

Thanks in advance.

+4
source share
4 answers

, , , , , , , . , :

meta = consumer.partitions_for_topic(topic)
options = {}
options[partition] = OffsetAndMetadata(message.offset + 1, meta)
consumer.commit(options)

, , - .

+5

. :

from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
...
topic = 'your_topic'
partition = 0
tp = TopicPartition(topic,partition)
kafkaConsumer = createKafkaConsumer()
kafkaConsumer.assign([tp])
offset = 15394125
kafkaConsumer.commit({
    tp: OffsetAndMetadata(offset, None)
})

, .

+3
from kafka import KafkaConsumer
from kafka import TopicPartition

TOPIC = "test_topic"
PARTITION = 0

consumer = KafkaConsumer(
    group_id=TOPIC,
    auto_offset_reset="earliest",
    bootstrap_servers="localhost:9092",
    request_timeout_ms=100000,
    session_timeout_ms=99000,
    max_poll_records=100,
)
topic_partition = TopicPartition(TOPIC, PARTITION)
# format: topic, partition
consumer.assign([topic_partition])
consumer.seek(topic_partition, 1660000)
# format: TopicPartition, offset. 1660000 is the offset been set.
for message in consumer:
    # do something
  • , , , .
  • aalmeida88 , , aalmeida88 , , , .
  • , , , , , , , kafka , , , , kafka zookeeper, kafka . , !

--- -----

.

topic_partition = TopicPartition(TOPIC,
                                 message.partition)
consumer.seek(topic_partition, offset_value)
consumer.commit()

, kafka, , , , , ( ) .

ps: , , .

+3
source

Just need to call consumer.commit()

from kafka import KafkaConsumer

KAFKA_TOPIC_NAME='KAFKA_TOPIC_NAME'
KAFKA_CONSUMER_GROUP='KAFKA_CONSUMER_GROUP'
consumer = KafkaConsumer(
    KAFKA_TOPIC_NAME,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    group_id=KAFKA_CONSUMER_GROUP
)
for message in consumer:
    print(message.value)
    consumer.commit()    # <--- This is what we need
    # Optionally, To check if everything went good
    from kafka import TopicPartition
    print('New Kafka offset: %s' % consumer.committed(TopicPartition(KAFKA_TOPIC_NAME, message.partition)))
0
source

Source: https://habr.com/ru/post/1623520/


All Articles