json stream
@Brad, , stream.Transform, JSON:
const { Transform } = require('stream');
class Delimited extends Transform {
constructor({ delimiter = /\r?\n/g, encoding = 'utf8' } = {}) {
super();
this._delimiter = delimiter instanceof RegExp ? delimiter : new RegExp(delimiter, 'g');
this._encoding = encoding;
this._buffer = '';
this._first = true;
}
_transform(chunk, encoding, callback) {
if (encoding === 'buffer') {
this._buffer += chunk.toString(this._encoding);
} else if (encoding === this._encoding) {
this._buffer += chunk;
} else {
this._buffer += Buffer.from(chunk, encoding).toString(this._encoding);
}
let partialJSON = '';
if (this._delimiter.test(this._buffer)) {
let sections = this._buffer.split(this._delimiter);
this._buffer = sections.pop();
sections.forEach(section => {
partialJSON += `${this._first ? '[' : ','}${JSON.stringify(section)}`;
this._first = false;
});
}
callback(null, partialJSON);
}
_flush(callback) {
callback(null, `${this._first ? '[' : ','}${JSON.stringify(this._buffer)}]`);
}
}
:
const fs = require('fs');
let stream = fs.createReadStream('file.txt', 'utf8');
let transform = new Delimited({ delimiter: /\n\n(?=Part #\d)/g });
let json = '';
transform.on('data', (chunk) => json += chunk);
transform.on('end', () => console.log(JSON.parse(json)));
stream.pipe(transform);
!
, JSON , , , objectMode: true:
const { Transform } = require('stream');
class Delimited extends Transform {
constructor(delimiter = /\r?\n/g) {
super({ objectMode: true });
this._delimiter = delimiter instanceof RegExp ? delimiter : new RegExp(delimiter, 'g');
this._encoding = 'utf8';
this._buffer = '';
this._first = true;
}
_transform(chunk, encoding, callback) {
if (encoding === 'buffer') {
this._buffer += chunk.toString(this._encoding);
} else if (encoding === this._encoding) {
this._buffer += chunk;
} else {
this._buffer += Buffer.from(chunk, encoding).toString(this._encoding);
}
if (this._delimiter.test(this._buffer)) {
let sections = this._buffer.split(this._delimiter);
this._buffer = sections.pop();
sections.forEach(this.push, this);
}
callback();
}
_flush(callback) {
callback(null, this._buffer);
}
}
:
const fs = require('fs');
let stream = fs.createReadStream('file.txt', 'utf8');
let transform = new Delimited(/\n\n(?=Part #\d)/g);
let array = [];
transform.on('data', (chunk) => array.push(chunk));
transform.on('end', () => console.log(array));
stream.pipe(transform);
!