I extracted the appropriate code ( doCleanup()
), which removes the lengthy subscriptions.
If successful, he performs:
LOG.info("Destroying durable subscriber due to inactivity: {}", sub);
In the event of a failure, it performs:
LOG.error("Failed to remove inactive durable subscriber", e);
Look above the log line in your log file and compare it with the details that you observed with the /users.jsp viewer admin. If it does not print any of the lines, the subscriptions may be left active
for some reason or you may stumble upon an error.
Also, can you try to remove the underscore (_) in the broker name if you can? The guide talks about problems with underscores in broker names.
The code:
public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) { this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true); this.cleanupTask = new TimerTask() { @Override public void run() { doCleanup(); } }; this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(),broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule()); } } public void doCleanup() { long now = System.currentTimeMillis(); for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) { DurableTopicSubscription sub = entry.getValue(); if (!sub.isActive()) { long offline = sub.getOfflineTimestamp(); if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) { LOG.info("Destroying durable subscriber due to inactivity: {}", sub); try { RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); info.setClientId(entry.getKey().getClientId()); info.setSubscriptionName(entry.getKey().getSubscriptionName()); ConnectionContext context = new ConnectionContext(); context.setBroker(broker); context.setClientId(entry.getKey().getClientId()); removeSubscription(context, info); } catch (Exception e) { LOG.error("Failed to remove inactive durable subscriber", e); } } } } } // The toString method for DurableTopicSubscription class @Override public synchronized String toString() { return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension(); }