Best way to call an API inside a loop using Promises

I have 500 million objects in which each has n contacts, as shown below

var groupsArray = [ {'G1': ['C1','C2','C3'....]}, {'G2': ['D1','D2','D3'....]} ... {'G2000': ['D2001','D2002','D2003'....]} ... ] 

I have two implementation methods in nodejs, which is based on regular promises, and the other on bluebird, as shown below

Plain promises

 ... var groupsArray = [ {'G1': ['C1','C2','C3']}, {'G2': ['D1','D2','D3']} ] function ajax(url) { return new Promise(function(resolve, reject) { request.get(url,{json: true}, function(error, data) { if (error) { reject(error); } else { resolve(data); } }); }); } _.each(groupsArray,function(groupData){ _.each(groupData,function(contactlists,groupIndex){ // console.log(groupIndex) _.each(contactlists,function(contactData){ ajax('http://localhost:3001/api/getcontactdata/'+groupIndex+'/'+contactData).then(function(result) { console.log(result.body); // Code depending on result }).catch(function() { // An error occurred }); }) }) }) ... 

Using the bluebird method, I used concurrency to check how to manage the promises queue

 ... _.each(groupsArray,function(groupData){ _.each(groupData,function(contactlists,groupIndex){ var contacts = []; // console.log(groupIndex) _.each(contactlists,function(contactData){ contacts.push({ contact_name: 'Contact ' + contactData }); }) groups.push({ task_name: 'Group ' + groupIndex, contacts: contacts }); }) }) Promise.each(groups, group => Promise.map(group.contacts, contact => new Promise((resolve, reject) => { /*setTimeout(() => resolve(group.task_name + ' ' + contact.contact_name), 1000);*/ request.get('http://localhost:3001/api/getcontactdata/'+group.task_name+'/'+contact.contact_name,{json: true}, function(error, data) { if (error) { reject(error); } else { resolve(data); } }); }).then(log => console.log(log.body)), { concurrency: 50 }).then(() => console.log())).then(() => { console.log('All Done!!'); }); ... 

I want to know when you are working with 100 million internal api calls using promises. please advise the best way to call the API asynchronously and process the response later.

+5
source share
2 answers

My answer is using the usual Node.js promises (this is probably easy to adapt to Bluebird or another library).

You can immediately disable all promises using Promise.all :

 var groupsArray = [ {'G1': ['C1','C2','C3']}, {'G2': ['D1','D2','D3']} ]; function ajax(url) { return new Promise(function(resolve, reject) { request.get(url,{json: true}, function(error, data) { if (error) { reject(error); } else { resolve(data); } }); }); } Promise.all(groupsArray.map(group => ajax("your-url-here"))) .then(results => { // Code that depends on all results. }) .catch(err => { // Handle the error. }); 

Using Promise.all tries to fulfill all your requests in parallel. This probably won't work when you have 500 million requests so that all attempts are made at the same time!

A more efficient way to do this is to use the JavaScript reduce function to consistently match your requests one by one:

 // ... Setup as before ... const results = []; groupsArray.reduce((prevPromise, group) => { return prevPromise.then(() => { return ajax("your-url-here") .then(result => { // Process a single result if necessary. results.push(result); // Collect your results. }); }); }, Promise.resolve() // Seed promise. ); .then(() => { // Code that depends on all results. }) .catch(err => { // Handle the error. }); 

This example combines promises, so that the next only starts after the previous is complete.

Unfortunately, the sequencing process will be very slow, because it has to wait for the completion of each request before starting a new one. Although each request is executed (it takes time for the API request), your processor is idle, while it can work with another request!

A more effective but complex approach to this problem is to use a combination of the above approaches. You must fulfill your requests so that the requests in each batch (say 10) are executed in parallel, and then the parties are sequentially ordered one after another.

It is difficult to implement this on your own - although this is a great training exercise - using a combination of Promise.all and reduce , but I would suggest using the async-await-parallel library. There are a bunch of such libraries, but I use this one and it works well and it does the right job easily.

You can install the library as follows:

 npm install --save async-await-parallel 

Here's how you use it:

 const parallel = require("async-await-parallel"); // ... Setup as before ... const batchSize = 10; parallel(groupsArray.map(group => { return () => { // We need to return a 'thunk' function, so that the jobs can be started when they are need, rather than all at once. return ajax("your-url-here"); } }, batchSize) .then(() => { // Code that depends on all results. }) .catch(err => { // Handle the error. }); 

It's better, but it's still a clumsy way to make such a large number of requests! You may need to raise your bid and consider investing in proper asynchronous job management.

I used Kue recently to manage a workflow cluster. Using Kue with the Node.js cluster library allows you to get the proper parallelism happening on a multi-core PC, and then you can easily expand it to combine cloud-based virtual machines if you need to grumble even more.

See my answer here for Kue example code.

+2
source

In my opinion, you have two problems related to one issue - I would separate them.

# 1 Download a large dataset

Working on such a large data set (500 m records) will probably cause some problems with memory limitation sooner or later - node.js runs on a single thread and is limited to using about 1.5 GB of memory - after that your process will fail.

To avoid reading the data as a stream from CSV, I will use scramjet , since it 'Help us with the second problem, but JSONStream or papaparse will be very good too:

 $ npm install --save scramjet 

Then read the data - I would suggest from CSV:

 const {StringStream} = require("scramjet"); const stream = require("fs") .createReadStream(pathToFile) .pipe(new StringStream('utf-8')) .csvParse() 

Now we have a stream of objects that will return data row by row, but only if we read it. Problem number 1 has been resolved, now to "increase" the flow:

# 2 Asynchronous data flow increase

Don’t worry - this is what you do - for each row of data you want to get some additional information (so add) from some API, which by default is asynchronous.

What is where the scramjet only triggers a couple of extra lines:

 stream .flatMap(groupData => Object.entries(groupData)) .flatMap(([groupIndex, contactList]) => contactList.map(contactData => ([contactData, groupIndex]) // now you have a simple stream of entries for your call .map(([contactData, groupIndex]) => ajax('http://localhost:3001/api/getcontactdata/'+groupIndex+'/'+contactData)) // and here you can print or do anything you like with your data stream .each(console.log) 

After that, you will need to accumulate data or output it to the stream - there are a number of parameters - for example: .toJSONArray().pipe(fileStream) .

Using scramjet , you can split the process into several lines without significant performance impact. Using setOptions({maxParallel: 32}) , you can control concurrency and, most importantly, all this will work with a minimum memory size - much faster than if you loaded all the data into memory.

Let me know how useful this is - your question is quite complicated, so let me know if you have any problems - I will be happy to help. :)

0
source

Source: https://habr.com/ru/post/1265592/


All Articles