Akka Flow Stops When AMQP Server Restarts

I have a very strange problem using Alpakka's AMQP connector and Akka streams.

When my RabbitMQ message broker reboots, the source seems to reboot. However, after it is restarted, the stream never terminates, and the message is lost in the section located further in the stream. When I start the AMQP server, my Akka application works fine, but everything is reversed.

This is how I initialize my AMQPSource:

val amqpMessageSource = builder.add {
  val amqpSource = AmqpSource(
    NamedQueueSourceSettings(connectionDetails, amqpInMessageQueue).withDeclarations(queueDeclaration),
    bufferSize = 10
  ).map { message =>
    fromIncomingMessage(message)
  }.initialDelay(5.seconds)
  amqpSource.recoverWithRetries(-1, { case _ => amqpSource }) // Retry every 5 seconds an infinity of times
}

I tried to delete the section where the problem occurs, to send the stream directly to the stream that is relevant to my example, and it is even weirder: in this case, the AMQP client does not even read messages from RabbitMQ anymore.

- , , .

+4

Source: https://habr.com/ru/post/1679113/


All Articles