Amqp.node will not detect connection failure

We have a node.js script that runs the socket.io server, whose clients consume messages from the RabbitMQ queue. We recently switched to Amazon AWS, and RabbitMQ is now a cluster of two machines (redundant instances). The AMQP connection is lost from time to time (this is a limitation that comes from a high availability environment with redundant virtual machines, and we must deal with it), and if an attempt to reconnect is made, DNS chooses which instance to connect to (this is a data replication cluster , so it doesn’t matter which instance to connect to).

The problem is that the attempt to reconnect is never made; after a while when the connection is lost, amqp.node does not seem to notice that the connection was lost. In addition, users stop receiving messages, and the socket.io server simply stops accepting new connections.

We have a Heartbeat timeout of 55 seconds (not to be confused with the heartbeat socket.io timeout) set to the RabbitMQ URL, and we check the "error" and "close" events with the amqp.node callback API, but they apparently never been published. Queues expect consumed messages to be processed. We want the node script to detect the lost connection and terminate by itself, so the environment will automatically start a new process and establish the connection again.

Here is the code, maybe we are doing something wrong with the amqp.node callback API or something else.

var express = require('express'); app = express(); var http = require('http'); var serverio = http.createServer(app); var io = require('socket.io').listen(serverio, { log: false }); var socket; var allcli = []; var red, blue, green, magenta, reset; red = '\033[31m'; blue = '\033[34m'; green = '\033[32m'; magenta = '\033[35m'; orange = '\033[43m'; reset = '\033[0m'; var queue = 'ha.atualizacao_mobile'; var urlRabbit = 'amqp://login: password@host ?heartbeat=55' // Amazon var amqp = require('amqplib/callback_api'); var debug = true; console.log("Original Socket.IO heartbeat interval: " + io.get('heartbeat interval') + " seconds."); io.set('heartbeat interval', 10 * 60); console.log("Hearbeat interval changed to " + io.get('heartbeat interval') + " seconds to reduce battery consumption in the mobile clients."); console.log("Original Socket.IO heartbeat timeout: " + io.get('heartbeat timeout') + " seconds."); io.set('heartbeat timeout', 11 * 60); console.log("Heartbeat timeout set to " + io.get('heartbeat timeout') + " seconds."); io.sockets.on('connection', function(socket){ socket.on('error', function (exc) { console.log(orange+"Ignoring exception: " + exc + reset); }); socket.on('send-indice', function (data) { // Some business logic }); socket.on('disconnect', function () { // Some business logic }); }); function updatecli(data){ // Some business logic } amqp.connect(urlRabbit, null, function(err, conn) { if (err !== null) { return console.log("Error creating connection: " + err); } conn.on('error', function(err) { console.log("Generated event 'error': " + err); }); conn.on('close', function() { console.log("Connection closed."); process.exit(); }); processRabbitConnection(conn, function() { conn.close(); }); }); function processRabbitConnection(conn, finalize) { conn.createChannel(function(err, channel) { if (err != null) { console.log("Error creating channel: " + err); return finalize(); } channel.assertQueue(queue, null, function(err, ok) { if (err !== null) { console.log("Error asserting queue " + queue + ": " + err); return finalize(); } channel.consume(queue, function (msg) { if (msg !== null) { try { var dataObj = JSON.parse(msg.content); if (debug == true) { //console.log(dataObj); } updatecli(dataObj); } catch(err) { console.log("Error in JSON: " + err); } channel.ack(msg); } }, null, function(err, ok) { if (err !== null) { console.log("Error consuming message: " + err); return finalize(); } }); }); }); } serverio.listen(9128, function () { console.log('Server: Socket IO Online - Port: 9128 - ' + new Date()); }); 
+6
source share
1 answer

Obviously, the problem is resolved. The problem was about 60 seconds. It conflicts with the RabbitMQ load balancer, which checks every minute or so whether the data passed through the connection or not (if the data did not pass, this terminates the connection). The AMQP connection stops receiving messages, and the library does not appear to be responding. In order to avoid this situation, a lower heartbeat is required (for example, 30 seconds).

+7
source

Source: https://habr.com/ru/post/980519/


All Articles