There are only three cases that need to be handled correctly:
- One block representing an entire line
- One block representing multiple rows
- One fragment representing only part of a line
Here is a description of the algorithm for solving all three situations
- Get a piece of data
- Scan snippet for newlines
- As soon as a new line is found, take everything in front of it (including the new line) and send it as one line with any changes you need.
- Repeat until the entire fragment is processed (no remaining data) or until no new rows are found (some data remains, save it later)
And here is a real implementation with a description of why it is needed, etc.
Please note that for performance reasons, I do not convert buffers to classic JS strings.
const { Transform } = require('stream') const prefix = Buffer.from('[worker]: ') const prepender = new Transform({ transform(chunk, encoding, done) { this._rest = this._rest && this._rest.length ? Buffer.concat([this._rest, chunk]) : chunk let index // As long as we keep finding newlines, keep making slices of the buffer and push them to the // readable side of the transform stream while ((index = this._rest.indexOf('\n')) !== -1) { // The `end` parameter is non-inclusive, so increase it to include the newline we found const line = this._rest.slice(0, ++index) // `start` is inclusive, but we are already one char ahead of the newline -> all good this._rest = this._rest.slice(index) // We have a single line here! Prepend the string we want this.push(Buffer.concat([prefix, line])) } return void done() }, // Called before the end of the input so we can handle any remaining // data that we have saved flush(done) { // If we have any remaining data in the cache, send it out if (this._rest && this._rest.length) { return void done(null, Buffer.concat([prefix, this._rest]) } }, }) process.stdin.pipe(prepender).pipe(process.stdout)
source share