EdinM RxJS node, . , , , RxJS Node. csv-parse csv-streamify. :
test_data.csv:
thing,name,owner,loc
chair,sitty,billy,san fran
table,setty,bryan,new oak
Run:
$ npm install rx rx-node csv-streamify
index.js:
"use strict";
const Rx = require('rx');
Rx.Node = require('rx-node');
const fs = require('fs');
const csv = require('csv-streamify');
let config = {
delimiter: ',',
newline: '\n',
quote: '"',
empty: '',
};
let parseCsv = csv(config);
function onNext (x) {
console.log('Next: ' + x);
};
function onError (err) {
console.log('Error: ' + err);
};
function onComplete () {
console.log('Completed');
};
let readStream = fs.createReadStream('test_files/test_data.csv');
readStream.pipe(parseCsv);
let subscription = Rx.Node.fromTransformStream(parseCsv)
.subscribe(onNext, onError, onComplete);
:
$ node index.js
:
Next: ["thing","name","owner","loc\r"]
Next: ["chair","sitty","billy","san fran\r"]
Next: ["table","setty","bryan","new oak"]
Completed
objectMode columns true csv, sideEffect :
function sideEffect (v){
console.log(v)
return v;
};
let subscription = Rx.Node.fromTransformStream(parseCsv)
.map(sideEffect)
.subscribe(onNext, onError, onComplete);
:
{ thing: 'chair',
name: 'sitty',
owner: 'billy',
'loc\r': 'san fran\r' }
Next: [object Object]
{ thing: 'table',
name: 'setty',
owner: 'bryan',
'loc\r': 'new oak' }
Next: [object Object]
Completed