How to run an asynchronous function for each line of a very large (> 1 GB) file in Node.js

Let's say you have a huge (> 1 GB) CSV record ids:

655453 4930285 493029 4930301 493031 ... 

And for each id you want to make a REST API call to retrieve the record data, convert it locally and paste it into the local database.

How do you do this with Node.js' Readable Stream ?

My question is basically this: how do you read a very large file in turn, run the async function for each line, and [optional] can you start reading the file from a specific line?

From the following Quora question, I'm starting to learn how to use fs.createReadStream :

http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js

 var fs = require('fs'); var lazy = require('lazy'); var stream = fs.createReadStream(path, { flags: 'r', encoding: 'utf-8' }); new lazy(stream).lines.forEach(function(line) { var id = line.toString(); // pause stream stream.pause(); // make async API call... makeAPICall(id, function() { // then resume to process next id stream.resume(); }); }); 

But this pseudocode does not work, because the lazy module forces you to read the entire file (like a stream, but there is no pause). Thus, this approach does not seem to work.

Another thing, I would like to be able to start processing this file from a specific line. The reason for this is that processing each id (making an api call, clearing data, etc.) can take up to half a second to write, so I don’t want to start from the beginning of the file every time. The naive approach I'm going to use is to just grab the line number of the last id processed and save it. Then, when you parse the file again, you go through all the identifiers, in turn, until you find the line number that you stopped at, and then you do the makeAPICall business. Another naive approach is to write small files (say, 100 identifiers) and process each file one at a time (a small enough data set to do everything in memory without an I / O stream). Is there a better way to do this?

I see how this gets complicated (and where node-lazy comes in) because chunk in stream.on('data', function(chunk) {}); can contain only part of the line (if the buffer size is small, each fragment can be 10 lines, but since id is a variable length, it can only be 9.5 lines or something else). That is why I am wondering what is the best approach to the above question.

+6
source share
2 answers

I think you do not need to use node-lazy . Here is what I found in Node docs :

Event : data

 function (data) { } 

The data event emits either Buffer (default) or string if setEncoding() .

So this means that you are calling setEncoding() on your stream, then your data event callback will take a string parameter. Then inside this callback you can call .pause() and .resume() .

The pseudocode should look like this:

 stream.setEncoding('utf8'); stream.addListener('data', function (line) { // pause stream stream.pause(); // make async API call... makeAPICall(line, function() { // then resume to process next line stream.resume(); }); }) 

Although the documents do not explicitly state that the stream is read line by line, I assume this is the case for file streams. At least on other languages ​​and platforms, text streams work this way, and I see no reason for Node streams that may be different.

+1
source

Regarding Andrey, Andrei Listochkin answer:

You can use a module, such as byline , to get a separate data event for each row. This is a transform stream around the source filter that generates a data event for each fragment. This allows you to pause after each line.

byline will not read the entire file in memory, for example, lazy .

 var fs = require('fs'); var byline = require('byline'); var stream = fs.createReadStream('bigFile.txt'); stream.setEncoding('utf8'); // Comment out this line to see what the transform stream changes. stream = byline.createStream(stream); // Write each line to the console with a delay. stream.on('data', function(line) { // Pause until we're done processing this line. stream.pause(); setTimeout(() => { console.log(line); // Resume processing. stream.resume(); }, 200); }); 
0
source

Source: https://habr.com/ru/post/918331/


All Articles