Using a cluster in a Node module

UPDATE: Even if this particular scenario is unrealistic, according to the comments, I'm still interested in how you can write a module that uses clustering without restarting the parent process every time.


I am trying to write a Node.js module called mass-request , which speeds up a large number of HTTP requests by extending them to child processes.

My hope is that it works like this from the outside.

 var mr = require("mass-request"), scraper = mr(); for (var i = 0; i < my_urls_to_visit.length; i += 1) { scraper.add(my_urls_to_visit[i], function(resp) { // do something with response } } 

To get started, I put together a skeleton for the bulk query module.

 var cluster = require("cluster"), numCPUs = require("os").cpus().length; module.exports = function() { console.log("hello from mass-request!"); if (cluster.isMaster) { for (var i = 0; i < numCPUs; i += 1) { var worker = cluster.fork(); } return { add: function(url, cb) {} } } else { console.log("worker " + process.pid + " is born!"); } } 

Then I test it like this: in a test script:

 var m = mr(); console.log("hello from test.js!", m); 

I expected to see "hello from the mass request!". four times (as indeed). To my surprise, I also see hello from test.js four times. Clearly, I do not understand how cluster.fork() works. Is it a repetition of the whole process, not just the function that calls it for the first time?

If so, how can clustering be used in a module without disturbing the person who uses this module with dirty multiprocessing logic?

+6
source share
2 answers

I believe what you are looking for is in setupMaster

From the docs:

cluster.setupMaster ([Settings])

  • Settings Object
    • exec The string path to the work file. (Default = process.argv [1])
    • args The string arguments of the array are passed to the worker. (Default = process.argv.slice (2))
    • silent Boolean whether to send the output to the parent stdio. (Default = false)

setupMaster is used to change the default behavior of fork. After the call, the settings will be present in cluster.settings

Using the exec property, you can start your employees from another module.

Important: as the state of the documents, this can only be called once. If you depend on this behavior for your module, then the caller cannot use cluster or it all falls apart.

For instance:

index.js

 var cluster = require("cluster"), path = require("path"), numCPUs = require("os").cpus().length; console.log("hello from mass-request!"); if (cluster.isMaster) { cluster.setupMaster({ exec: path.join(__dirname, 'worker.js') }); for (var i = 0; i < numCPUs; i += 1) { var worker = cluster.fork(); } return { add: function (url, cb) { } } } else { console.log("worker " + process.pid + " is born!"); } 

worker.js

 console.log("worker " + process.pid + " is born!"); 

Output

 node index.js hello from mass-request! worker 38821 is born! worker 38820 is born! worker 38822 is born! worker 38819 is born! 
+4
source

Although it is true that the asynchronous nature of node.js makes it awesome, it still runs in a single thread on the server in a single event loop. Multithreading a node.js application with a cluster allows you to drop the child processes of the application into its threads, which allows you to better use a multi-core server. I built the architecture of the game server some time ago when I used the cluster and zmq (ZeroMQ) for multithreading and allowed processes to easily send messages back and forth on various channels. I simplified this architecture in the example below to hopefully help illustrate how multi-threaded node.js can be combined. I apologize if this is a little rude, it was many years ago, and I was relatively new to node at that time;)

Ideally, you do not want to embed everything for master / child in one script, but I decided that this is the easiest way to allow you to copy / paste / run :)

As you mentioned in your comment, I gave a good example of clustering, but not one that matches your specific use case as far as sending is concerned. I did not have much time, so I adapted my example so that it worked for your needs pretty quickly. Take a picture:

<strong> mass request.js

 var cluster = require('cluster'); var zmq = require('zmq'); module.exports = { _childId : null, _urls : [], _threadCount : 1, _readyThreads : 0, _callbacks : {}, zmqReceive : null, //the socket we receive on for this thread zmqMaster : null, //the socket to the master zmqChildren : {}, //an object storing the sockets for the children setThreads : function( threadCount ) { this._threadCount = threadCount; }, add : function( url , cb ) { this._urls.push( {url: url, cb : cb } ); }, run : function() { if( cluster.isMaster ) { this._masterThread(); } else { this._childThread(); } }, _masterThread : function() { console.log( 'Master Process Starting Up' ); this.zmqReceive = zmq.socket('pull').bindSync( 'ipc://master.ipc' ); //bind handler for messages coming into this process using closure to allow us to access the massrequest object inside the callback ( function( massRequest ) { this.zmqReceive.on( 'message' , function( msg ) { msg = JSON.parse(msg); //was this an online notification? if( msg && msg.status == 'Online' ) { massRequest._threadReady(); return; //we're done } if( msg && msg.html ) { //this was a response from a child, call the callback for it massRequest._callbacks[ msg.sender ].call( massRequest , msg.html ); //send the child another URL massRequest._sendUrlToChild( msg.sender ); } } ); }).call( this , this ); //fork 4 child processes and set up the sending sockets for them for( var i=0; i < this._threadCount; ++i ) { //set up the sending socket this.zmqChildren[i] = zmq.socket('push').connect( 'ipc://child_' + i + '.ipc' ); //fork the process and pass it an id cluster.fork( { _childId:i } ); } }, _sendUrlToChild : function( child ) { //if there no urls left, return (this would also be a good place to send a message to the child to exit gracefully) if( !this._urls.length ) return; //grab a url to process var item = this._urls.pop(); //set the callback for the child this._callbacks[child] = item.cb; this.zmqChildren[child].send( JSON.stringify( { url:item.url } ) ); }, _processUrls : function() { for( var i=0; i < this._threadCount; ++i ) { this._sendUrlToChild( i ); } }, _threadReady : function() { if( ++this._readyThreads >= this._threadCount ) { //all threads are ready, send out urls to start the mayhem console.log( 'All threads online, starting URL processing' ); this._processUrls(); } }, _childProcessUrl : function( url ) { console.log( 'Child Process ' + this.childId + ' Handling URL: ' + url ); //do something here to scrape your content however you see fit var html = 'HTML'; this.zmqMaster.send( JSON.stringify( { sender:this.childId, html:html } ) ); }, _childThread : function() { //get the child id that was passed from cluster this.childId = process.env._childId; console.log( 'Child Process ' + this.childId + ' Starting Up' ); //bind the pull socket to receive messages to this process this.zmqReceive = zmq.socket('pull').bindSync( 'ipc://child_' + this.childId + '.ipc' ); //bind the push socket to send to the master this.zmqMaster = zmq.socket('push').connect('ipc://master.ipc'); //bind handler for messages coming into this process ( function( massRequest ) { this.zmqReceive.on( 'message' , function( msg ) { msg = JSON.parse(msg); console.log( 'Child ' + this.childId + ': ' + msg ); //handle the url if( msg && msg.url ) massRequest._childProcessUrl( msg.url ); } ); }).call( this , this ); //let the master know we're done setting up this.zmqMaster.send( JSON.stringify({sender:this.childId,status:'Online'}) ); }, } 

demo.js

 var mr = require( './mass-request.js' ); mr.setThreads( 4 ); mr.add( 'http://foo.com' , function( resp ) { console.log( 'http://foo.com is done' ); } ); mr.add( 'http://bar.com' , function( resp ) { console.log( 'http://bar.com is done' ); } ); mr.add( 'http://alpha.com' , function( resp ) { console.log( 'http://alpha.com is done' ); } ); mr.add( 'http://beta.com' , function( resp ) { console.log( 'http://beta.com is done' ); } ); mr.add( 'http://theta.com' , function( resp ) { console.log( 'http://theta.com is done' ); } ); mr.add( 'http://apples.com' , function( resp ) { console.log( 'http://apples.com is done' ); } ); mr.add( 'http://oranges.com' , function( resp ) { console.log( 'http://oranges.com is done' ); } ); mr.run(); 

Put those in one folder and run node demo.js

I should also point out that since the base of this was pulled from one of my other projects that used [0MQ] [ http://zeromq.org/] , you will need that it is installed next to the [node.js for it ] [ https://github.com/JustinTulloss/zeromq.node] npm install zmq and, obviously, a cluster module. You can swap ZMQ for any other interprocess communication method that you certainly want. It just happens to the one with which I am familiar and used.

Overview: the main thread of the AKA script, which calls the run () method, will deploy X-children (can be set by calling setThreads). These children return to the main stream through ZeroMQ sockets when they complete initialization. When all the threads are ready, the script wizard sends the URLs to the children so that they can run and retrieve the HTML code. They return the HTML to the host, where it passes it to the appropriate callback function for that URL, and then sends another URL to the script child. Although this is not an ideal solution, callback functions still go to a bottleneck in the main (main) thread, because you cannot easily move them to another thread. These callbacks may contain closures / variables / etc that may not work properly outside the parent thread without any mechanism for exchanging objects.

Anywho, if you expand my little demo here, you will see 4 streams of URL β€œhandling” (they just don't load the URLs for simplicity).

Hope this helps;)

+3
source

Source: https://habr.com/ru/post/969612/


All Articles