Unable to populate a large chunk of mongodb data with Node.js

I am invited to import a large chunk of meteorological data collected from many sites throughout the city. Each site has 1 computer having one folder, which is synchronized with a central server every 5 minutes. Every day a new file is created. So basically the structure is this. One txt file has a format in the form of a csv file, which has the 1st line in the form of fields, and the rest are numbers.

folder_on_server
          | __ site1 __ date1.txt
          | | __ date2.txt
          |
          | __ site2 __ date1.txt
                   | __ date2.txt
I wrote a small node.js application to populate this data on mongoDB. However, currently we only have 3 sites, but each site has almost 900 txt files, each file contains 24 * 20 = 288 lines (since data is recorded every 5 minutes). I tried to start the node application, but after reading about 100 files of the first folder, the program crashes with an error about the memory allocation error.

I tried many ways to improve this:

  • Increase nodejs memory size to 8GB => litle better, more files will be read, but still will not be able to move to the next folder.
  • undefined _.forEach( ) = > .
  • ( fs.readdir), = > .

js , ?

1: 100 . , , . , .

+2
2

.

/.

:

var Async = require('async');
var Csv = require('csv-streamify');
var Es = require('event-stream');
var Fs = require('fs');
var Mapping = require('./folder2siteRef.json');
var MongoClient = require('mongodb').MongoClient;

var sourcePath = '/hnet/incoming/' + new Date().getFullYear();

Async.auto({
  db: function (callback) {
    console.log('opening db connection');
    MongoClient.connect('mongodb://localhost:27017/test3', callback);
  },
  subDirectory: function (callback) {
    // read the list of subfolder, which are sites
    Fs.readdir(sourcePath, callback);
  },
  loadData: ['db', 'subDirectory', function (callback, results) {
    Async.each(results.subDirectory, load(results.db), callback);
  }],
  cleanUp: ['db', 'loadData', function (callback, results) {
    console.log('closing db connection');
    results.db.close(callback);
  }]
}, function (err) {
  console.log(err || 'Done');
});

var load = function (db) {
  return function (directory, callback) {
    var basePath = sourcePath + '/' + directory;
    Async.waterfall([
      function (callback) {
        Fs.readdir(basePath, callback); // array of files in a directory
      },
      function (files, callback) {
        console.log('loading ' + files.length + ' files from ' + directory);
        Async.each(files, function (file, callback) {
          Fs.createReadStream(basePath + '/' + file)
            .pipe(Csv({objectMode: true, columns: true}))
            .pipe(transform(directory))
            .pipe(batch(200))
            .pipe(insert(db).on('end', callback));
        }, callback);
      }
    ], callback);
  };
};

var transform = function (directory) {
  return Es.map(function (data, callback) {
    data.siteRef = Mapping[directory];
    data.epoch = parseInt((data.TheTime - 25569) * 86400) + 6 * 3600;
    callback(null, data);
  });
};

var insert = function (db) {
  return Es.map(
    function (data, callback) {
      if (data.length) {
        var bulk = db.collection('hnet').initializeUnorderedBulkOp();
        data.forEach(function (doc) {
          bulk.insert(doc);
        });
        bulk.execute(callback);
      } else {
        callback();
      }
    }
  );
};

var batch = function (batchSize) {
  batchSize = batchSize || 1000;
  var batch = [];

  return Es.through(
    function write (data) {
      batch.push(data);
      if (batch.length === batchSize) {
        this.emit('data', batch);
        batch = [];
      }
    },
    function end () {
      if (batch.length) {
        this.emit('data', batch);
        batch = [];
      }
      this.emit('end');
    }
  );
};

tomongo.js script, . async /.

, , , . 3xdirs 900x 288xlines. , , . . , . , .

node.js:

http://nodestreams.com - .

event-stream .

+1

, - . fs.createReadStream() .readFileSync(). , / , :

linereader.js

var fs = require("fs");
var util = require("util");
var EventEmitter = require("events").EventEmitter;

function LineReader(path, splitOn) {

    var readStream = fs.createReadStream(path);
    var self = this;
    var lineNum = 0;
    var buff = ""
    var chunk;

    readStream.on("readable", function() {

        while( (chunk = readStream.read(100)) !== null) {
            buff += chunk.toString();
            var lines = buff.split(splitOn);

            for (var i = 0; i < lines.length - 1; i++) {
                self.emit("line",lines[i]);
                lineNum += 1;
            }
            buff = lines[lines.length - 1];
        }
    });
    readStream.on("close", function() {
        self.emit("line", buff);
        self.emit("close")
    });
    readStream.on("error", function(err) {
        self.emit("error", err);
    })
}
util.inherits(LineReader, EventEmitter);
module.exports = LineReader;

"" , . , async ( , ), , :

app.js

var LineReader = require("./linereader.js");
var async = require("async");

var paths = ["./text1.txt", "./text2.txt", "./path1/text3.txt"];
var reader;

async.eachSeries(paths, function(path, callback) {

    reader = new LineReader(path, /\n/g);

    reader.on("line", function(line) {
        var doc = turnTextIntoObject(line);
        db.collection("mycollection").insert(doc);
    })
    reader.on("close", callback);
    reader.on("error", callback);
}, function(err) {
    // handle error and finish;
})
+1

Source: https://habr.com/ru/post/1667800/


All Articles