How to limit concurrency flatMap?

I am trying to use RxJS to write a script to process several hundred log files, each of which is about 1 GB. The skeleton script looks like

Rx.Observable.from(arrayOfLogFilePath) .flatMap(function(logFilePath){ return Rx.Node.fromReadStream(logFilePath) .filter(filterLogLine) }) .groupBy(someGroupingFunc) .map(someFurtherProcessing) .subscribe(...) 

The code works, but note that the filtering step of all log files will start at the same time. However, in terms of IO file system performance, it is advisable to process one file after another (or at least limit concurrency to several files, rather than opening all hundreds of files at the same time). In this regard, how can I implement it in a “functional reactive manner”?

I was thinking about a planner, but could not figure out how this could help here.

+5
source share
2 answers

You can use .merge(maxConcurrent) to limit concurrency. Since .merge(maxConcurrent) aligns the meta-survey (observable with observables) to observable, you need to replace .flatMap with .map so that the output is meta-serviced ("unflat"), then you call .merge(maxConcurrent) .

 Rx.Observable.from(arrayOfLogFilePath) .map(function(logFilePath){ return Rx.Node.fromReadStream(logFilePath) .filter(filterLogLine) }) .merge(2) // 2 concurrent .groupBy(someGroupingFunc) .map(someFurtherProcessing) .subscribe(...) 

This code has not been tested (since I do not have access to your development environment), but this is how to proceed. RxJS does not have a large number of operators with concurrency parameters, but you can almost always do what you need with .merge(maxConcurrent) .

+13
source

I just solved a similar problem with RxJs 5, so I hope the solution can help others with a similar problem.

 // Simulate always processing 2 requests in parallel (when one is finished it starts processing one more), // retry two times, push error on stream if retry fails. //const Rx = require('rxjs-es6/Rx'); // -- Global variabel just to show that it works. -- let parallelRequests = 0; // -------------------------------------------------- function simulateRequest(req) { console.log("Request " + req); // --- To log retries --- var retry = 0; // ---------------------- // Can't retry a promise, need to restart before the promise is made. return Rx.Observable.of(req).flatMap(req => new Promise((resolve, reject) => { var random = Math.floor(Math.random() * 2000); // -- To show that it works -- if (retry) { console.log("Retrying request " + req + " ,retry " + retry); } else { parallelRequests++; } // --------------------------- setTimeout(() => { if (random < 900) { retry++; return reject(req + " !!!FAILED!!!"); } return resolve(req); }, random); })).retry(2).catch(e => Rx.Observable.of(e)); } Rx.Observable.range(1, 10) .flatMap(e => simulateRequest(e), null, 2) // -- To show that it works -- .do(() => { console.log("ParallelRequests " + parallelRequests); parallelRequests--; }) // --------------------------- .subscribe(e => console.log("Response from request " + e), e => console.log("Should not happen, error: " + e), e => console.log("Finished")); 
 <script src="https://npmcdn.com/@reactivex/ rxjs@5.0.0-beta.6 /dist/global/Rx.umd.js"></script> 
0
source

Source: https://habr.com/ru/post/1203667/


All Articles