How to use node stream conversion in rxjs?

I have been playing with rxjsfor some time, and I like how I can use it for logic, and not for imperative programming. However, I also like the node stream, which is also very complex, so my obvious reaction was to use both of them, but I didn’t see it mentioned much (in fact, I wasn’t at all) except for the binding to him in the rxjs book .

So my question is, how can I use all the conversion threads that are in npm on RxJS? Or is it even possible?
Example: -

var fs = require('fs');
var csv = require('csv-parse')({delimiter:';'});
var src = fs.createReadStream('./myFile.csv');
src.pipe(csv).pipe(process.stdout);

Essentially, I would like to do this: -

var fs = require('fs');
var csv = require('csv-parse')({delimiter:';'});
var rx= require('rx-node');
var src = fs.createReadStream('./myFile.csv');

var obj = rx.fromReadableStream(src);
obj.pipe(csb).map(x=>console.log(x));

I was told to use highlandin the past, but I am strictly looking for a solution rxjshere.

+4
2

rx-node, ! : All streams are event emitters!.

: input.txt

Hello World!
Hello World!
Hello World!
Hello World!
Hello World!

Run:

npm install through2 split2 rx rx-node

index.js:

var Rx = require('rx');
Rx.Node = require('rx-node');
var fs = require('fs');
var th2 = require('through2');
var split2 = require('split2');

var file = fs.createReadStream('./input.txt').on('error', console.log.bind(console, 'fs err'));

var transform = th2(function(ch, en, cb) {
  cb(null, ch.toString());
}).on('error', function(err) {
  console.log(err, err.toString());
});

// All streams are event emitters ! (one way without using rx-node)
// var subs = Rx.Observable.fromEvent(transform, 'data').share();
// subs
//   .map(value => 'Begin line: ' + value)
//   .subscribe(value => console.log(value));

// rx-node has convenience functions (another way)
var subs = Rx.Node.fromTransformStream(transform).share()
  .map(value => 'Begin line: ' + value)
  .subscribe(value => console.log(value));

file.pipe(split2()).pipe(transform);

:

Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
+4

EdinM RxJS node, . , , , RxJS Node. csv-parse csv-streamify. ​​ :

test_data.csv:

thing,name,owner,loc
chair,sitty,billy,san fran
table,setty,bryan,new oak

Run:

$ npm install rx rx-node csv-streamify

index.js:

"use strict";

const Rx = require('rx');
Rx.Node = require('rx-node');
const fs = require('fs');
const csv = require('csv-streamify');


//Setting up the transform-stream CSV parser

let config = {
  delimiter: ',',       // comma, semicolon, whatever
  newline: '\n',        // newline character (use \r\n for CRLF files)
  quote: '"',           // what considered a quote
  empty: '',            // empty fields are replaced by this
  //objectMode: true,   //parses csv table into an array of objects
  //columns: true       //uses column headers for the object fields
};

let parseCsv = csv(config);

//Setting up the RxJS Observer

function onNext (x) {
    //do your side-effects here, after the data has 
    //gone through the observables operator chain
    console.log('Next: ' + x); 
};
function onError (err) {
    console.log('Error: ' + err);
};
function onComplete () {
    console.log('Completed');
};

let readStream = fs.createReadStream('test_files/test_data.csv');

readStream.pipe(parseCsv);

let subscription = Rx.Node.fromTransformStream(parseCsv)
    //do something with the data with an operator such as:
    //.map()
    .subscribe(onNext, onError, onComplete);


:

$ node index.js

:

Next: ["thing","name","owner","loc\r"]

Next: ["chair","sitty","billy","san fran\r"]

Next: ["table","setty","bryan","new oak"]

Completed


objectMode columns true csv, sideEffect :

function sideEffect (v){ 
  console.log(v) 
  return v;
};

let subscription = Rx.Node.fromTransformStream(parseCsv)
  .map(sideEffect)
  .subscribe(onNext, onError, onComplete);

:

{ thing: 'chair',
  name: 'sitty',
  owner: 'billy',
  'loc\r': 'san fran\r' }
Next: [object Object]
{ thing: 'table',
  name: 'setty',
  owner: 'bryan',
  'loc\r': 'new oak' }
Next: [object Object]
Completed
+1

Source: https://habr.com/ru/post/1619557/


All Articles