Control javascript asynchronous stream speed (in loop)

Suppose you want to start a (random) process for each folder in a list in short code:

var exec = require('child_process').exec; var folders = [...]; // a list from somewhere _.each(folders, function(folder) { exec("tar cvf " + folder + ".tgz " + folder); }); 

If the list is long, I can start a large number of processes simultaneously, which should be avoided. What is a pretty simple way to start execution at a controlled speed (maximum 5 concurrent processes here)?

edit : this question is for every kind of asynchronous stream (in which you want to control the speed), and not just for the problem with exec-over-folders.

+5
source share
3 answers

Use the async package and it executes the function: eachLimit

it does the same as lodash , but with asynchronous stream processing and expiration of the iterations at the same time:

 var async = require('async'); var exec = require('child_process').exec; var folders = [...]; // a list from somewhere var maxProcesses = 5; // 5 items at a time async.eachLimit( folders, // collection maxProcesses, // limit function(folder, next) { // iterator function. args: item, callback var cmd = "tar -cf " + folder + ".tgz " + folder; console.log('calling:', cmd); exec(cmd, function(err, stdOut, stdErr) { // executing cmd if(err) console.error(err); // if error putting to console next(); // passing the async flow to handle the next iteration }); }, function() { // after all iterations finished console.log('finished processing commands'); }); 

or parallelLimit :

 var async = require('async'); var _ = require('lodash'); var exec = require('child_process').exec; var folders = [...]; // a list from somewhere var callStack = []; _.each(folders, function(folder) { // generating our stack of commands callStack.push(function(done) { var cmd = "tar -cf " + folder + ".tgz " + folder; exec(cmd, function(err, stdOut, stdErr) { if(err) console.error(err); done(null, folder); }); }); }); var maxProcesses = 5; // 5 items at a time async.parallelLimit(callStack, maxProcesses, function() {console.log('finished');}); 

"making it look shorter" :)

 const async = require('async'), exec = require('child_process').exec; let folders = [...]; async.eachLimit(folders, 5, (folder, next) => exec("tar -cf " + folder + ".tgz " + folder, () => next()), () => console.log('finished')); 

and

 const async = require('async'), exec = require('child_process').exec; let folders = [...]; let commands = folders.map(folder => done => exec("tar -cf " + folder + ".tgz " + folder, () => done()); async.parallelLimit(commands, 5, () => console.log('finished')); 



if any of these examples is not suitable for you or your system is very large, so try using a message queuing system like rsmq

+6
source

Promises

I just love promises and like to stick to them wherever possible.

Here is a solution that I believe will work for your case.

 var exec = require('child_process').exec; var folders = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]; var maxConcurrentProcessCount = 5; var promiseArr = []; folders.forEach(function (folder) { var pr = { start: function () { if (pr.promise) return pr.promise; return pr.promise = new Promise(function (resolve) { exec("tar cvf " + folder + ".tgz " + folder, undefined, (err, stdout, stderr) => { // This is your logic, you can reject depending on err var ind = promiseArr.indexOf(pr); if (ind >= 0) promiseArr.splice(ind, 1); resolve(stdout); }); }); } }; promiseArr.push(pr); }); var racePromises = function () { if (!promiseArr.length) return; Promise.race(promiseArr.slice(0, maxConcurrentProcessCount).map(x => x.start())).then(racePromises); console.log("Current running process count: " + promiseArr.filter(x => x.promise).length); } racePromises(); 

Short description

Create an array in which each element represents a task. First select 5 of them and run them. Whenever one of them completes, remove it from the array and run 5 tasks from the array again.

Run example

Test example

Recreating eachLimit with promises just for fun

 var myEachLimit = function (collection, maxConcurrentCalls, callback) { return new Promise(function (resolve, reject) { var promiseArr = []; collection.forEach(function (item) { var pr = { start: function () { if (pr.promise) return pr.promise; return pr.promise = new Promise(function (resolve) { callback.call(item, item, function () { var ind = promiseArr.indexOf(pr); if (ind >= 0) promiseArr.splice(ind, 1); resolve(); }); }); } }; promiseArr.push(pr); }); var racePromises = function () { if (!promiseArr.length) { resolve(); return; } Promise.race(promiseArr.slice(0, maxConcurrentProcessCount).map(x => x.start())).then(racePromises); console.log("Current running process count: " + promiseArr.filter(x => x.promise).length); } racePromises(); }); } // Demo var exec = require('child_process').exec; var folders = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]; var maxConcurrentProcessCount = 5; myEachLimit(folders, maxConcurrentProcessCount, function (folder, next) { exec("tar cvf " + folder + ".tgz " + folder, (err, stdout, stderr) => { next(); }); }).then(function () { console.log("Finished all processes"); }); 
+2
source

Native Javascript

All you need is some kind of loader. Put your loop in a separate function:

  /** * Loops through your Folderarray and begins at the given index. * @param {[type]} lastIndex [last index which your loop stopped.] * @param {[type]} maxProcesses [maximum of processes you want to have.] * @param {[type]} folderArray [folder array.] */ function loopFolders(maxProcesses, lastIndex, folderArray){ // counter for our actual processes. let processes = 0; // This is to stop the loop, since JavaScript has no built-in break for loops. let maximumReached = false; // loop through array. folderArray.forEach(function(element, index, array){ // Do stuff once you hit the last point. if(index > lastIndex && !maximumReached){ // Check how many processes are running. if(processes <= maxProcesses){ // create your child process. let exec = require('child_process').exec; // Do stuff with Folderelement from Folderarray. exec("tar cvf " + element + ".tgz " + element); // Increment processes. processes++; }else{ /** * Magic begins here: * If max processes was reached, retry after a while. */ // We are done for this loop. maximumReached = true; // Set timeout for 10 seconds and then retry. setTimeout(function(){ // Calll function again. loopFolders(maxProcesses, index, array); }, 10000); } } }); } 

To call this loop from the start, you just go like this:

 // your Array of folders from somewhere. let folders = [...]; // Call loopFolders with maximum of 5 and the beginning index of 0. loopFolders(5, 0, folders); 

This code is a very simple example of a loadbalancer. Keep in mind that my example will never know if any other processes are running. You can use some kind of callback to be sure. But that should do the trick for you, at least start with something.

To use NodeJS Childprocess events, view https://nodejs.org/api/child_process.html

You can call the loopback in the exit event to make sure your child processes are not out of control.

Hope this helps.

Regards, Megajin

+1
source

Source: https://habr.com/ru/post/1259293/


All Articles