How to programmatically check if NServiceBus has finished processing all messages

As part of the effort to automate the start / stop of some of our NServiceBus services, I would like to know when the service finished processing all the messages in the input queue.

The problem is that although the NServiceBus service is running, my C # code is reporting one smaller message than it actually is . Therefore, he believes that the queue is empty when another message remains. If the service is stopped, it reports the "correct" number of messages. This is confusing because when I check the queues myself using the Private Queues view in Computer Management, it displays the “correct” number.

I am using a variant of the following C # code to find the number of posts:

var queue = new MessageQueue(path); return queue.GetAllMessages().Length; 

I know this will happen terribly when there are a lot of messages. The queues that I check should only have a few messages at a time.

I looked at other questions , but did not find the help I needed.

Any insights or suggestions would be appreciated!

Update:. I should have mentioned that this service is behind the distributor, who disconnects before trying to disable this service. Therefore, I am sure that new messages will not be added to the service entry queue.

+4
source share
2 answers

WMI was the answer! Here is the first pass in the code. This undoubtedly could be improved.

 public int GetMessageCount(string queuePath) { const string query = "select * from Win32_PerfRawData_MSMQ_MSMQQueue"; var query = new WqlObjectQuery(query); var searcher = new ManagementObjectSearcher(query); var queues = searcher.Get(); foreach (ManagementObject queue in queues) { var name = queue["Name"].ToString(); if (AreTheSameQueue(queuePath, name)) { // Depending on the machine (32/64-bit), this value is a different type. // Casting directly to UInt64 or UInt32 only works on the relative CPU architecture. // To work around this run-time unknown, convert to string and then parse to int. var countAsString = queue["MessagesInQueue"].ToString(); var messageCount = int.Parse(countAsString); return messageCount; } } return 0; } private static bool AreTheSameQueue(string path1, string path2) { // Tests whether two queue paths are equivalent, accounting for differences // in case and length (if one path was truncated, for example by WMI). string sanitizedPath1 = Sanitize(path1); string sanitizedPath2 = Sanitize(path2); if (sanitizedPath1.Length > sanitizedPath2.Length) { return sanitizedPath1.StartsWith(sanitizedPath2); } if (sanitizedPath1.Length < sanitizedPath2.Length) { return sanitizedPath2.StartsWith(sanitizedPath1); } return sanitizedPath1 == sanitizedPath2; } private static string Sanitize(string queueName) { var machineName = Environment.MachineName.ToLowerInvariant(); return queueName.ToLowerInvariant().Replace(machineName, "."); } 
+1
source

The fact is that this is actually not a “one smaller message”, but rather depends on the number of messages that are currently processed by the endpoint, which in a multi-threaded process can reach the number of threads.

There is also a problem with client processes that continue to send messages in the same queue.

Probably the only "right" way to handle this is to count messages several times with a delay between them, and if the number remains zero for a certain number of attempts that you can count, the queue is empty.

+2
source

Source: https://habr.com/ru/post/1437597/


All Articles