Creating synchronous queries using node-mysql

I am trying to ensure that one mysql query leads to another and is not completed until all of its child queries are complete. So, for example, I start with one line of select and stream and execute subsequent queries from the result of this line. This can be done with callbacks, but in the end I run out of memory, so I would like to slow down the process and start the batches, but due to the asynchronous nature of the send, I cannot keep everything in phase and end the connection after all the lines processed.

Here is an example:

var query = conn.query('select id from table1 limit 10');

query.on('result', function(row){
    console.log('query1', row);
    var query2 = conn.query('select id from books where id  = ? ', [row.id]);
    query2.on('result', function(row2){
        console.log('query2', row2);
        var query3 = conn.query('insert into test (id) values (?)', [row2.id]);
        query3.on('result', function(row3){
            console.log(row3);
        });
    });
});

query.on('end', function(){
    conn.end();
});

, 3 .
? , xml , .

!

+4
3

async module:

var async = require("async");
// connection instance
var conn;

// here goes task serving logic
// if any async function should be finished before drain callback, push them into q
var solvers = {
    query: function(q, task, row){
        console.log('query1', row);
        q.push({
            solver: "query2",
            req: "select id from books where id = ?",
            reqArgs: [row.id]
        });
    },
    query2: function(q, task, row){
        console.log('query2', row);
        q.push({
            solver: "query3",
            req: "insert into test (id) values (?)",
            reqArgs: [row.id]
        });
    },
    query3: function(q, task, row){
        console.log(row);
    }
}

// here is a queue of tasks
var q = async.queue(function(task, cb){
    var query = conn.query(task.req, task.reqArgs);
    query.on("end", cb);
    query.on("result",function(row){
        solvers[task.solver](q, task, row);
    });
}, 2); // limit of parallel queries

// when every request has reached "end"
q.drain = function(){
    conn.end();
    // continue from here
};

// initial task
q.push({
    solver: "query",
    req: "select id from table1 limit 10",
    reqArgs: []
});

, , ID ID - .
, .

+2

@glukki, async. , "chomp chew", 100 . , 1.2M. 10 . . , - . !

function populateMesh(row, callback){    

    xmlParser.parseString('<root>'+row.mesh_heading_list+'</root>', function(err, result){

        var q2 = async.queue(function (task, cb) {

            pool.getConnection(function(err, cnx){
                cnx.query('INSERT INTO abstract_mesh (mesh_id, abstract_id, major_topic) SELECT mesh_descriptor.id, ?, ? FROM mesh_descriptor WHERE mesh_descriptor.name = ?', [task.id, task.majorTopic, task.descriptorName], function(err, result){
                    if (err) {throw err;}
                    cnx.release();
                    cb();
                });
            });

        }, 50);

        q2.drain = function() {
            //console.log('all mesh processed');
            callback();
        }

        if(!(result.root instanceof Object)){
            //console.log('its not obj!', row.id);
            q2.push({id: row.id, majorTopic: 'N', descriptorName: 'Null'}, function (err) {});
        }

        for(var i in result.root.MeshHeading){
//            console.log('in loop',result.root.MeshHeading[i].DescriptorName);
            if(typeof result.root.MeshHeading[i].DescriptorName === 'undefined'){
                q2.push({id: row.id, majorTopic: 'N', descriptorName: 'Emergency'}, function(err){});
            }

            for(var j in result.root.MeshHeading[i].DescriptorName){

                var descriptorName = result.root.MeshHeading[i].DescriptorName[j]._;
                var majorTopic = result.root.MeshHeading[i].DescriptorName[j].$.MajorTopicYN;

                q2.push({id: row.id, majorTopic: majorTopic, descriptorName: descriptorName}, function (err) {});

            }
        }
    });       

}


// here goes task serving logic
// if any async function should be finished before drain callback, push them into q
var q = async.queue(function (row, callback) {
        console.log('got id: ' + row.id);
        populateMesh(row, function(){
            callback();
        });

    }, 10);

    q.drain = function() {
        console.log('all items have been processed');
        conn.end(function(err){
            console.log('connection ended');
        });
        pool.end(function(err){
            console.log('pool closed');
        });
    };

var truncate = conn.query('truncate abstract_mesh');

var select = conn.query('SELECT id, mesh_heading_list FROM pubtbl');

    select.on('result', function(result){
//        console.log(result);
        q.push(result, function (err) {
            //console.log('finished processing row');
        });
    });
+2

, .

synchonize.

npm

var sync = require(synchronize);

,

sync.fiber(function() { //put your logic here }

mysql:

var express = require('express');
var bodyParser = require('body-parser');
var mysql = require('mysql');
var sync = require('synchronize');

var db = mysql.createConnection({
    host     : 'localhost',
    user     : 'user',
    password : 'password',
    database : 'database'
});

db.connect(function(err) {
    if (err) {
        console.error('error connecting: ' + err.stack);
        return;
    }
});

function saveSomething() {
    var post  = {id: newId};
    //no callback here; the result is in "query"
    var query = sync.await(db.query('INSERT INTO mainTable SET ?', post, sync.defer()));
    var newId = query.insertId;
    post  = {foreignKey: newId};
    //this query can be async, because it doesn't matter in this case
    db.query('INSERT INTO subTable SET ?', post, function(err, result) {
        if (err) throw err;
    });
}

"saveSomething()", . . promises .

+2
source

Source: https://habr.com/ru/post/1543695/


All Articles