The number of throttle promises opens at a given time

The following Typescript makes each doSomething(action) call, one at a time. (The value of the second item in the list does not receive a call made before the first completion).

 async performActionsOneAtATime() { for (let action of listOfActions) { const actionResult = await doSomethingOnServer(action); console.log(`Action Done: ${actionResult}`); } } 

This will send all requests to the server at once (without waiting for answers):

 async performActionsInParallel() { for (let action of listOfActions) { const actionResultPromise = doSomething(action); actionResultPromise.then((actionResult) => { console.log(`Action Done: ${actionResult}`); }); } } 

But I really need a way to throttle them. There may be 10 or 20 calls open at the same time. (One at a time too slow, but all 600 will overload the server.)

But it’s hard for me to understand this.

Any suggestions on how I can activate the call count for X at the same time?

(TypeScript is used in this question, but I will be fine with the answer in JavaScript ES6.)

+5
source share
5 answers

You can do this in one short function. (Update: returns values ​​in order at the suggestion of the naomik.)

 /** * Performs a list of callable actions (promise factories) so that only a limited * number of promises are pending at any given time. * * @param listOfCallableActions An array of callable functions, which should * return promises. * @param limit The maximum number of promises to have pending at once. * @returns A Promise that resolves to the full list of values when everything is done. */ function throttleActions(listOfCallableActions, limit) { // We'll need to store which is the next promise in the list. let i = 0; let resultArray = new Array(listOfCallableActions.length); // Now define what happens when any of the actions completes. Javascript is // (mostly) single-threaded, so only one completion handler will call at a // given time. Because we return doNextAction, the Promise chain continues as // long as there an action left in the list. function doNextAction() { if (i < listOfCallableActions.length) { // Save the current value of i, so we can put the result in the right place let actionIndex = i++; let nextAction = listOfCallableActions[actionIndex]; return Promise.resolve(nextAction()) .then(result => { // Save results to the correct array index. resultArray[actionIndex] = result; return; }).then(doNextAction); } } // Now start up the original <limit> number of promises. // i advances in calls to doNextAction. let listOfPromises = []; while (i < limit && i < listOfCallableActions.length) { listOfPromises.push(doNextAction()); } return Promise.all(listOfPromises).then(() => resultArray); } // Test harness: function delay(name, ms) { return new Promise((resolve, reject) => setTimeout(function() { console.log(name); resolve(name); }, ms)); } var ps = []; for (let i = 0; i < 10; i++) { ps.push(() => delay("promise " + i, Math.random() * 3000)); } throttleActions(ps, 3).then(result => console.log(result)); 
+7
source

EDIT

Jeff Bowman significantly improved his response to address meaningful values. Feel free to look at the history of this answer to understand why the allowed values ​​are so important / useful.


throttlep

This solution closely mimics the native Promise.all

How is it the same & hellip;

  • Promises resolves as quickly as possible
  • Solves an array of values ​​in the same order as the inputs
  • It is rejected as soon as it meets with one of them.

How is it different & hellip;

  • The number parameter limits the number of concurrent Promises
  • Entering an array accepts promise makers (thunks); not relevant Promises

 // throttlep :: Number -> [(* -> Promise)] const throttlep = n=> Ps=> new Promise ((pass, fail)=> { // r is the number of promises, xs is final resolved value let r = Ps.length, xs = [] // decrement r, save the resolved value in position i, run the next promise let next = i=> x=> (r--, xs[i] = x, run(Ps[n], n++)) // if r is 0, we can resolve the final value xs, otherwise chain next let run = (P,i)=> r === 0 ? pass(xs) : P().then(next(i), fail) // initialize by running the first n promises Ps.slice(0,n).forEach(run) }) // ----------------------------------------------------- // make sure it works // delay :: (String, Number) -> (* -> Promise) const delay = (id, ms)=> new Promise (pass=> { console.log (`running: ${id}`) setTimeout(pass, ms, id) }) // ps :: [(* -> Promise)] let ps = new Array(10) for (let i = 0; i < 10; i++) { ps[i] = () => delay(i, Math.random() * 3000) } // run a limit of 3 promises in parallel // the first error will reject the entire pool throttlep (3) (ps) .then ( xs => console.log ('result:', xs), err=> console.log ('error:', err.message) ) 

Console exit

The inputs are triggered in order; Solved results are in the same order as inputs

 running: 0 running: 1 running: 2 => Promise {} running: 3 running: 4 running: 5 running: 6 running: 7 running: 8 running: 9 result: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 

Practical use

Take a look at a more practical code example. For this code, you select the set of images from the server. So we can use throttlep to simultaneously reduce the number of simultaneous requests to 3

 // getImage :: String -> Promise<base64> let getImage = url=> makeRequest(url).then(data => data.base64, reqErrorHandler) // actions :: [(* -> Promise<base64>)] let actions = [ ()=> getImage('one.jpg'), ()=> getImage('two.jpg'), ()=> getImage('three.jpg'), ()=> getImage('four.jpg'), ()=> getImage('five.jpg') ] // throttle the actions then do something... throttlep (3) (actions) .then(results => { // results are guaranteed to be ordered the same as the input array console.log(results) // [<base64>, <base64>, <base64>, <base64>, <base64>] }) 
+3
source

There is nothing built-in for this, so you have to create your own. AFAIK, there is no library for this yet.

First, start with a “deferral" - a promise that allows the external code to resolve it:

 class Deferral<T> { constructor() { this.promise = new Promise<T>((resolve, reject) => { this.resolve = resolve; this.reject = reject; }); } promise: Promise<T>; resolve: (thenableOrResult?: T | PromiseLike<T>) => void; reject: (error: any) => void; } 

Then you can define a “wait queue” that represents all the blocks of code that are waiting to enter the critical section:

 class WaitQueue<T> { private deferrals: Deferral<T>[]; constructor() { this.deferrals = []; } get isEmpty(): boolean { return this.deferrals.length === 0; } enqueue(): Promise<T> { const deferral = new Deferral<T>(); this.deferrals.push(deferral); return deferral.promise; } dequeue(result?: T) { const deferral = this.deferrals.shift(); deferral.resolve(result); } } 

Finally, you can define an asynchronous semaphore, as such:

 export class AsyncSemaphore { private queue: WaitQueue<void>; private _count: number; constructor(count: number = 0) { this.queue = new WaitQueue<void>(); this._count = count; } get count(): number { return this._count; } waitAsync(): Promise<void> { if (this._count !== 0) { --this._count; return Promise.resolve(); } return this.queue.enqueue(); } release(value: number = 1) { while (value !== 0 && !this.queue.isEmpty) { this.queue.dequeue(); --value; } this._count += value; } } 

Usage example:

 async function performActionsInParallel() { const semaphore = new AsyncSemaphore(10); const listOfActions = [...]; const promises = listOfActions.map(async (action) => { await semaphore.waitAsync(); try { await doSomething(action); } finally { semaphore.release(); } }); const results = await Promise.all(promises); } 

This method first creates a choke and then immediately starts all asynchronous operations. Each asynchronous operation first (asynchronously) waits for the semaphore to be released, then performs the action, and finally frees the semaphore (allows another). When all asynchronous operations are completed, all results will be obtained.

Warning: this code is not 100% fully tested. I have not even tried this once.

+2
source

You can do this with the pub-sub template. I am also not familiar with typescipt, and I do not know if this is happening in the browser or on the server. I will just write pseudoCode for this (suppose it's a backend):

 //I'm assuming required packages are included eg events = require("events"); let limit = 10; let emitter = new events.EventEmitter(); for(let i=0; i<limit; i++){ fetchNext(listOfActions.pop()); } function fetchNext(action){ const actionResultPromise = doSomething(action); actionResultPromise.then((actionResult) => { console.log(`Action Done: ${actionResult}`); emitter.emit('grabTheNextOne', listOfActions.pop()); }); } emitter.on('grabTheNextOne', fetchNext); 

EventEmitter is part of NodeJS if you work in Node. If in the browser you can use the model of ordinary events. The main idea here is the publication-subscription template.

+1
source

You can strangle Promises with a generator. In the example below, we throttle them so that

 function asyncTask(duration = 1000) { return new Promise(resolve => { setTimeout(resolve, duration, duration) }) } async function main() { const items = Array(10).fill(() => asyncTask()) { const generator = batchThrottle(3, ...items) console.log('batch', (await generator.next()).value) for await (let result of generator) { console.log('remaining batch', result) } } { const generator = streamThrottle(3, ...items) console.log('stream', await generator.next()) for await (let result of generator) { console.log('remaining stream', result) } } } async function* batchThrottle(n = 5, ...items) { while (items.length) { const tasks = items.splice(0, n).map(fn => fn()) yield Promise.all(tasks) } } async function* streamThrottle(n = 5, ...items) { while (items.length) { const tasks = items.splice(0, n).map(fn => fn()) yield* await Promise.all(tasks) } } main().catch() 
0
source

Source: https://habr.com/ru/post/1272475/


All Articles