Mass increase in MongoDB using mongoose

Is it possible to perform massive upserts with mongoose? So, basically having an array and inserting each element if it does not exist, or update it if it exists? (I use customs _ids)

When I use .insert , MongoDB returns an E11000 error for duplicate keys (which need to be updated). Inserting a few new documents works fine:

var Users = self.db.collection('Users');

Users.insert(data, function(err){
            if (err) {
                callback(err);
            }
            else {
                callback(null);
            }
        });

Using .save returns an error that the parameter should be the only document:

Users.save(data, function(err){
   ...
}

This answer suggests that there is no such option, however, it is specific to C #, and also for 3 years now. Therefore, I was wondering if there is an opportunity to do this with the help of mongoose?

Thank!

+23
5

"" , , . MongoDB, 2.6, " API " , . , , " " .

" " "" . .collection , " " "node ", :

 var mongoose = require('mongoose'),
     Schema = mongoose.Schema;

 mongoose.connect('mongodb://localhost/test');

 var sampleSchema  = new Schema({},{ "strict": false });

 var Sample = mongoose.model( "Sample", sampleSchema, "sample" );

 mongoose.connection.on("open", function(err,conn) { 

    var bulk = Sample.collection.initializeOrderedBulkOp();
    var counter = 0;

    // representing a long loop
    for ( var x = 0; x < 100000; x++ ) {

        bulk.find(/* some search */).upsert().updateOne(
            /* update conditions */
        });
        counter++;

        if ( counter % 1000 == 0 )
            bulk.execute(function(err,result) {             
                bulk = Sample.collection.initializeOrderedBulkOp();
            });
    }

    if ( counter % 1000 != 0 )
        bulk.execute(function(err,result) {
           // maybe do something with result
        });

 });

, , " " , "", . , "", .

, , - . , , .

+21

(1000), @neil-lunn. . Promise:

var Promise = require('bluebird');
var mongoose = require('mongoose');

var Show = mongoose.model('Show', {
  "id": Number,
  "title": String,
  "provider":  {'type':String, 'default':'eztv'}
});

/**
 * Atomic connect Promise - not sure if I need this, might be in mongoose already..
 * @return {Priomise}
 */
function connect(uri, options){
  return new Promise(function(resolve, reject){
    mongoose.connect(uri, options, function(err){
      if (err) return reject(err);
      resolve(mongoose.connection);
    });
  });
}

/**
 * Bulk-upsert an array of records
 * @param  {Array}    records  List of records to update
 * @param  {Model}    Model    Mongoose model to update
 * @param  {Object}   match    Database field to match
 * @return {Promise}  always resolves a BulkWriteResult
 */
function save(records, Model, match){
  match = match || 'id';
  return new Promise(function(resolve, reject){
    var bulk = Model.collection.initializeUnorderedBulkOp();
    records.forEach(function(record){
      var query = {};
      query[match] = record[match];
      bulk.find(query).upsert().updateOne( record );
    });
    bulk.execute(function(err, bulkres){
        if (err) return reject(err);
        resolve(bulkres);
    });
  });
}

/**
 * Map function for EZTV-to-Show
 * @param  {Object} show EZTV show
 * @return {Object}      Mongoose Show object
 */
function mapEZ(show){
  return {
    title: show.title,
    id: Number(show.id),
    provider: 'eztv'
  };
}

// if you are  not using EZTV, put shows in here
var shows = []; // giant array of {id: X, title: "X"}

// var eztv = require('eztv');
// eztv.getShows({}, function(err, shows){
//   if(err) return console.log('EZ Error:', err);

//   var shows = shows.map(mapEZ);
  console.log('found', shows.length, 'shows.');
  connect('mongodb://localhost/tv', {}).then(function(db){
    save(shows, Show).then(function(bulkRes){
      console.log('Bulk complete.', bulkRes);
      db.close();
    }, function(err){
        console.log('Bulk Error:', err);
        db.close();
    });
  }, function(err){
    console.log('DB Error:', err);
  });

// });

, , , , , ( Promises .) . , . eztv, eztv- .

+17

db.collection, .. xxx : initializeOrderedBulkOp()

mongoose. -, mongoose mongo db.collection.

npm install mongoose

.

+1

, . -, 10000 4 . , socketTimeoutMS connectTimeoutMS mongoose , , - . , @neil lunn for. , , ,

let BATCH_SIZE = 500
Array.prototype.chunk = function (groupsize) {
    var sets = [];
    var chunks = this.length / groupsize;

    for (var i = 0, j = 0; i < chunks; i++ , j += groupsize) {
        sets[i] = this.slice(j, j + groupsize);
    }

    return sets;
}

function upsertDiscountedProducts(products) {

    //Take the input array of products and divide it into chunks of BATCH_SIZE

    let chunks = products.chunk(BATCH_SIZE), current = 0

    console.log('Number of chunks ', chunks.length)

    let bulk = models.Product.collection.initializeUnorderedBulkOp();

    //Get the current time as timestamp
    let timestamp = new Date(),

        //Keep track of the number of items being looped
        pendingCount = 0,
        inserted = 0,
        upserted = 0,
        matched = 0,
        modified = 0,
        removed = 0,

        //If atleast one upsert was performed
        upsertHappened = false;

    //Call the load function to get started
    load()
    function load() {

        //If we have a chunk to process
        if (current < chunks.length) {
            console.log('Current value ', current)

            for (let i = 0; i < chunks[current].length; i++) {
                //For each item set the updated timestamp to the current time
                let item = chunks[current][i]

                //Set the updated timestamp on each item
                item.updatedAt = timestamp;

                bulk.find({ _id: item._id })
                    .upsert()
                    .updateOne({
                        "$set": item,

                        //If the item is being newly inserted, set a created timestamp on it
                        "$setOnInsert": {
                            "createdAt": timestamp
                        }
                    })
            }

            //Execute the bulk operation for the current chunk
            bulk.execute((error, result) => {
                if (error) {
                    console.error('Error while inserting products' + JSON.stringify(error))
                    next()
                }
                else {

                    //Atleast one upsert has happened
                    upsertHappened = true;
                    inserted += result.nInserted
                    upserted += result.nUpserted
                    matched += result.nMatched
                    modified += result.nModified
                    removed += result.nRemoved

                    //Move to the next chunk
                    next()
                }
            })



        }
        else {
            console.log("Calling finish")
            finish()
        }

    }

    function next() {
        current++;

        //Reassign bulk to a new object and call load once again on the new object after incrementing chunk
        bulk = models.Product.collection.initializeUnorderedBulkOp();
        setTimeout(load, 0)
    }

    function finish() {

        console.log('Inserted ', inserted + ' Upserted ', upserted, ' Matched ', matched, ' Modified ', modified, ' Removed ', removed)

        //If atleast one chunk was inserted, remove all items with a 0% discount or not updated in the latest upsert
        if (upsertHappened) {
            console.log("Calling remove")
            remove()
        }


    }

    /**
     * Remove all the items that were not updated in the recent upsert or those items with a discount of 0
     */
    function remove() {

        models.Product.remove(
            {
                "$or":
                [{
                    "updatedAt": { "$lt": timestamp }
                },
                {
                    "discount": { "$eq": 0 }
                }]
            }, (error, obj) => {
                if (error) {
                    console.log('Error while removing', JSON.stringify(error))
                }
                else {
                    if (obj.result.n === 0) {
                        console.log('Nothing was removed')
                    } else {
                        console.log('Removed ' + obj.result.n + ' documents')
                    }
                }
            }
        )
    }
}
0

Mongoose, upsertMany upsert .

An additional advantage of using this plugin over the initialization of your own mass op in the base collection is that this plugin first converts your data to the Mongoose model, and then back to simple objects before recovery. This ensures that Mongoose schema validation is applied and the data is depopulated and suitable for raw insertion.

https://github.com/meanie/mongoose-upsert-many https://www.npmjs.com/package/@meanie/mongoose-upsert-many

Hope this helps!

0
source

Source: https://habr.com/ru/post/1687357/


All Articles