Perhaps the main problem is how the node-kafka module that I use implemented things, but maybe not, so we go ...
Using the node-kafa library, I ran into the problem of subscribing to consumer.on('message') events. The library uses the standard events module, so I think this question can be quite general.
My actual code structure is large and complex, so here is a pseudo-example of the main layout to highlight my problem. (Note: this piece of code has not been verified, so I may have errors here, but the syntax is not called into question here)
var messageCount = 0; var queryCount = 0; // Getting messages via some event Emitter consumer.on('message', function(message) { message++; console.log('Message #' + message); // Making a database call for each message mysql.query('SELECT "test" AS testQuery', function(err, rows, fields) { queryCount++; console.log('Query #' + queryCount); }); })
What I see here is when I start my server, there are 100,000 or so forgotten messages that kafka wants to give me, and this happens through the event emitter. So I'm starting to receive messages. It takes about 15 seconds to receive and register all messages.
This is what I would expect to see for output, assuming the mysql query is fast enough:
Message #1 Message #2 Message #3 ... Message #500 Query #1 Message #501 Message #502 Query #2 ... and so on in some intermingled fashion
I would expect this because my first mysql result should be ready very quickly, and I expect the result to take its turn in the event loop to handle the response. I really get:
Message #1 Message #2 ... Message #100000 Query #1 Query #2 ... Query #100000
I get every message before the mysql response can be processed. So my question is: why? Why can't I get the result of a single database until all message events are complete?
Another note: I set a breakpoint in .emit('message') in node-kafka and in mysql.query() in my code, and I find them step by step. Thus, it seems that all 100,000 issuers do not stack up before they get into my event subscriber. So, my first hypothesis about the problem arose.
Ideas and knowledge will be greatly appreciated :)