Insert large csv file, 200'000 lines +, in MongoDB in NodeJS

I am trying to parse and paste a large csv file into MongoDB, but when the file extends 100'000 lines, I get a bad response from the server. And the files that I need to insert are usually above 200'000 lines.

I tried using the stream stream bulk insert (insertMany) and Babyparse (Papaparse) to insert a file line by line. But with bad results.

Node api:

router.post('/csv-upload/:id', multipartMiddleware, function(req, res) {

    // Post vartiables
    var fileId = req.params.id;
    var csv = req.files.files.path;

    // create a queue object with concurrency 5
    var q = async.queue(function(row, callback) {
        var entry = new Entry(row);
        entry.save();
        callback();
    }, 5);

    baby.parseFiles(csv, {
        header: true, // Includes header in JSON
        skipEmptyLines: true,
        fastMode: true,
        step: function(results, parser) {
            results.data[0].id = fileId;

            q.push(results.data[0], function (err) {
                if (err) {throw err};
            });
        },
        complete: function(results, file) {
            console.log("Parsing complete:", results, file);
            q.drain = function() {
                console.log('All items have been processed');
                res.send("Completed!");
            };
        }
    });
});

This streaming approach results in: network POST SERVER :: ERR_EMPTY_RESPONSE

Not sure if I am using async.queue correctly.

Is there a better and effective way to do this OR am I doing something wrong?

Express server:

// Dependencies
var express = require('express');
var path = require('path');
var bodyParser = require('body-parser');
var routes = require('./server/routes');
var mongoose = require("mongoose");
var babel = require("babel-core/register");
var compression = require('compression');
var PORT = process.env.PORT || 3000;
// Include the cluster module
var cluster = require('cluster');

mongoose.connect(process.env.MONGOLAB_URI || 'mongodb://localhost/routes');

  // Code to run if we're in the master process
 if (cluster.isMaster) {

    // Count the machine CPUs
    var cpuCount = require('os').cpus().length;

    // Create a worker for each CPU
    for (var i = 0; i < cpuCount; i += 1) {
        cluster.fork();
    }

 // Code to run if we're in a worker process
 } else {
    // Express
    var app = express();

    app.use(bodyParser.json({limit: '50mb'}));
    app.use(bodyParser.urlencoded({limit: '50mb', extended: true}));

    // Compress responses
    app.use(compression());

    // Used for production build
    app.use(express.static(path.join(__dirname, 'public')));

    routes(app);

    // Routes
    app.use('/api', require('./server/routes/api'));

    app.all('/*', function(req, res) {
        res.sendFile(path.join(__dirname, 'public/index.html'));
    });

    // Start server
    app.listen(PORT, function() {
        console.log('Server ' + cluster.worker.id + ' running on ' + PORT);
    });
}
+4
1

:

, , csv mongo - :

mongoimport -d db_name -c collection_name --type csv --file file.csv --headerline 

, mongoimport (- , )

node :

var exec = require('child_process').exec;
var cmd = 'mongoimport -d db_name -c collection_name --type csv --file file.csv --headerline';

exec(cmd, function(error, stdout, stderr) {
  // do whatever you need during the callback
});

, , .

:

- .

, 60 (, )

- ( .io npm) . .

, /, .

:

, , . / , /.

+3

Source: https://habr.com/ru/post/1667795/


All Articles