We have a node.js script that runs the socket.io server, whose clients consume messages from the RabbitMQ queue. We recently switched to Amazon AWS, and RabbitMQ is now a cluster of two machines (redundant instances). The AMQP connection is lost from time to time (this is a limitation that comes from a high availability environment with redundant virtual machines, and we must deal with it), and if an attempt to reconnect is made, DNS chooses which instance to connect to (this is a data replication cluster , so it doesnβt matter which instance to connect to).
The problem is that the attempt to reconnect is never made; after a while when the connection is lost, amqp.node does not seem to notice that the connection was lost. In addition, users stop receiving messages, and the socket.io server simply stops accepting new connections.
We have a Heartbeat timeout of 55 seconds (not to be confused with the heartbeat socket.io timeout) set to the RabbitMQ URL, and we check the "error" and "close" events with the amqp.node callback API, but they apparently never been published. Queues expect consumed messages to be processed. We want the node script to detect the lost connection and terminate by itself, so the environment will automatically start a new process and establish the connection again.
Here is the code, maybe we are doing something wrong with the amqp.node callback API or something else.
var express = require('express'); app = express(); var http = require('http'); var serverio = http.createServer(app); var io = require('socket.io').listen(serverio, { log: false }); var socket; var allcli = []; var red, blue, green, magenta, reset; red = '\033[31m'; blue = '\033[34m'; green = '\033[32m'; magenta = '\033[35m'; orange = '\033[43m'; reset = '\033[0m'; var queue = 'ha.atualizacao_mobile'; var urlRabbit = 'amqp://login: password@host ?heartbeat=55' // Amazon var amqp = require('amqplib/callback_api'); var debug = true; console.log("Original Socket.IO heartbeat interval: " + io.get('heartbeat interval') + " seconds."); io.set('heartbeat interval', 10 * 60); console.log("Hearbeat interval changed to " + io.get('heartbeat interval') + " seconds to reduce battery consumption in the mobile clients."); console.log("Original Socket.IO heartbeat timeout: " + io.get('heartbeat timeout') + " seconds."); io.set('heartbeat timeout', 11 * 60); console.log("Heartbeat timeout set to " + io.get('heartbeat timeout') + " seconds."); io.sockets.on('connection', function(socket){ socket.on('error', function (exc) { console.log(orange+"Ignoring exception: " + exc + reset); }); socket.on('send-indice', function (data) { // Some business logic }); socket.on('disconnect', function () { // Some business logic }); }); function updatecli(data){ // Some business logic } amqp.connect(urlRabbit, null, function(err, conn) { if (err !== null) { return console.log("Error creating connection: " + err); } conn.on('error', function(err) { console.log("Generated event 'error': " + err); }); conn.on('close', function() { console.log("Connection closed."); process.exit(); }); processRabbitConnection(conn, function() { conn.close(); }); }); function processRabbitConnection(conn, finalize) { conn.createChannel(function(err, channel) { if (err != null) { console.log("Error creating channel: " + err); return finalize(); } channel.assertQueue(queue, null, function(err, ok) { if (err !== null) { console.log("Error asserting queue " + queue + ": " + err); return finalize(); } channel.consume(queue, function (msg) { if (msg !== null) { try { var dataObj = JSON.parse(msg.content); if (debug == true) { //console.log(dataObj); } updatecli(dataObj); } catch(err) { console.log("Error in JSON: " + err); } channel.ack(msg); } }, null, function(err, ok) { if (err !== null) { console.log("Error consuming message: " + err); return finalize(); } }); }); }); } serverio.listen(9128, function () { console.log('Server: Socket IO Online - Port: 9128 - ' + new Date()); });