I use the code below to read posts from a topic. I ran into two problems. Whenever I start a consumer, does it read all the messages in the queue? How to read only unread messages?
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
consumer.commit()
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
source
share