How to make my ActiveMQ broker inaccessible to subscribers

We have an ActiveMQ broker that connects to very different clients using JMS, AMQP and MQTT. For some reason, we have not yet figured out that a certain set of MQTT clients often (not always) subscribe for a long time. This is a test environment in which clients are added and removed quite often, the latter sometimes pulling the plug or rebooting the embedded device so that they cannot properly unsubscribe. The effect (IIUC) is that the broker accumulates an “offline long-term subscription” for devices that he will never be able to see again (I see them under http: // my_broker: 8161 / admin / subscribers.jsp ), saving messages to these threads forever, until he finally breaks down under his own memory.

The problem is that subscribers subscribe for a long time, and we need to find out why this is the case. However, it was also decided that customers doing this (involuntarily) should not bring the broker to a halt, so we need to solve this problem ourselves.

I found there are timeout settings for offline long-term subscriptions and put them in our broker configuration (last two lines):

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="my_broker" dataDirectory="${activemq.data}" useJmx="true" advisorySupport="false" persistent="false" offlineDurableSubscriberTimeout="1800000" offlineDurableSubscriberTaskSchedule="60000"> 

If I understand correctly, the above should check every minute and reject customers that he hasn’t seen for half an hour. However, contrary to the documents, this does not seem to work: the consumer I signed up with and then pulled out the plug-in a few days ago is still visible in the list of regular subscribers offline, the broker's memory is constantly increasing, and if I delete subscribers manually in the broker's web interface. I see that the amount of memory is falling.

So here are my questions:

  • What determines if a MQTT ActiveMQ broker subscription is durable?
  • What am I doing wrong in setting a timeout to drop offline subheadings in ActiveMQ settings?
+6
source share
1 answer

I extracted the appropriate code ( doCleanup() ), which removes the lengthy subscriptions.

If successful, he performs:

  LOG.info("Destroying durable subscriber due to inactivity: {}", sub); 

In the event of a failure, it performs:

  LOG.error("Failed to remove inactive durable subscriber", e); 

Look above the log line in your log file and compare it with the details that you observed with the /users.jsp viewer admin. If it does not print any of the lines, the subscriptions may be left active for some reason or you may stumble upon an error.

Also, can you try to remove the underscore (_) in the broker name if you can? The guide talks about problems with underscores in broker names.

The code:

 public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) { this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true); this.cleanupTask = new TimerTask() { @Override public void run() { doCleanup(); } }; this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(),broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule()); } } public void doCleanup() { long now = System.currentTimeMillis(); for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) { DurableTopicSubscription sub = entry.getValue(); if (!sub.isActive()) { long offline = sub.getOfflineTimestamp(); if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) { LOG.info("Destroying durable subscriber due to inactivity: {}", sub); try { RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); info.setClientId(entry.getKey().getClientId()); info.setSubscriptionName(entry.getKey().getSubscriptionName()); ConnectionContext context = new ConnectionContext(); context.setBroker(broker); context.setClientId(entry.getKey().getClientId()); removeSubscription(context, info); } catch (Exception e) { LOG.error("Failed to remove inactive durable subscriber", e); } } } } } // The toString method for DurableTopicSubscription class @Override public synchronized String toString() { return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension(); } 
+2
source

Source: https://habr.com/ru/post/1012223/


All Articles