im using the kafka- node module https://github.com/SOHU-Co/kafka-node
and every time I restart the consumer, they get all the old messages, im using a cyclic system (load balancing)
Do you have an idea how I can tell the server that I used a message and it does not send it to me again when I restart the user?
some error in my code or configuration server?
any idea?
manufacturer code
var kafka = require('kafka-node'); var HighLevelProducer = kafka.HighLevelProducer; var Client = kafka.Client; var client = new Client('xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181', 'consumer' + process.pid); var argv = require('optimist').argv; var topic = argv.topic || 'test_12345'; var producer = new HighLevelProducer(client); var time = process.hrtime(); var message, diff,i=0; producer.on('ready', function () { setInterval(function(){ var date = new Date(); var dateString = date.getFullYear() + "-" +((date.getMonth()+1)<10 ? '0'+(date.getMonth()+1) : (date.getMonth()+1)) + "-" +(date.getDate()<10 ? '0'+date.getDate() : date.getDate()) + " " +(date.getHours()<10 ? '0'+date.getHours() : date.getHours()) + ":" +(date.getMinutes()<10 ? '0'+date.getMinutes() : date.getMinutes()) + ":" +(date.getSeconds()<10 ? '0'+date.getSeconds() : date.getSeconds()); message = JSON.stringify({'message' : 'hello - '+dateString}); console.log(message); send(message); },1000); }); function send(message) { producer.send([ {topic: topic, messages: [message] } ], function (err, data) { console.log(data); if (err) console.log(err); }); }
working code:
var kafka = require('kafka-node'); var HighLevelConsumer = kafka.HighLevelConsumer; var Offset = kafka.Offset; var Client = kafka.Client; var argv = require('optimist').argv; var topic = argv.topic || 'test_12345'; var client = new Client('xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181','consumer'+process.pid); var payloads = [ { topic: topic }]; var options = { groupId: 'kafka-node-group', // Auto commit config autoCommit: true, autoCommitMsgCount: 100, autoCommitIntervalMs: 5000, // Fetch message config fetchMaxWaitMs: 100, fetchMinBytes: 1, fetchMaxBytes: 1024 * 10, fromOffset: false, fromBeginning: false }; var consumer = new HighLevelConsumer(client, payloads, options); var offset = new Offset(client); consumer.on('message', function (message) { console.log(this.id, message); }); consumer.on('error', function (err) { console.log('error', err); }); consumer.on('offsetOutOfRange', function (topic) { console.log("------------- offsetOutOfRange ------------"); topic.maxNum = 2; offset.fetch([topic], function (err, offsets) { var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); consumer.setOffset(topic.topic, topic.partition, min); }); });
zookeeper zoo.cfg (5 servers)
The number of milliseconds of each tick tickTime=2000
kafka server.properties (5 servers)
Cordially