Proper use of message queue in 2008R2

I am not a programmer, but I try to help them by giving them some guidance. We no longer have experience in msmq. We are trying to use this to integrate some features with a planning application.

The scheduler application starts the task by creating a webcam using a custom DLL. DLL calls weburl. The web application will complete its task and send updates to the website about the task in progress. The website writes a message to the queue. A DLL, called a site, controls the queue for messages with a label that has been assigned to this job. When he receives a status message, he closes.

We receive the following message every few hours. We use this method at almost 100 jobs per hour. In the code below, jobid matches the label for the message in the message queue. Each job produces a jobid at the beginning and will use this as a label for each message it sends to msmq for this job.

System.Messaging.MessageQueueException (0x80004005): Message that the cursor is currently pointing to has been removed from the queue by another process or by another call to Receive without the use of this cursor. at System.Messaging.MessageQueue.ReceiveCurrent(TimeSpan timeout, Int32 action, CursorHandle cursor, MessagePropertyFilter filter, MessageQueueTransaction internalTransaction, MessageQueueTransactionType transactionType) at System.Messaging.MessageEnumerator.get_Current() 

Here is the code for it.

  while ( running ) { // System.Console.WriteLine( "Begin Peek" ); messageQueue.Peek(); //System.Console.WriteLine( "End Peek" ); messageQueue.MessageReadPropertyFilter.SetAll(); using ( MessageEnumerator enumerator = messageQueue.GetMessageEnumerator2() ) { enumerator.Reset(); while ( enumerator.MoveNext() ) { Message msg = enumerator.Current; if ( msg.Label.Equals( this.jobid ) ) { StringBuilder sb = new StringBuilder(); /* try { sb.Append( "Message Source: " ); //sb.Append( msg.SourceMachine ); sb.Append( " Sent: " ); sb.Append( msg.SentTime ); sb.Append( " Label " ); sb.Append( msg.Label ); sb.Append( " ID: " ); sb.Append( msg.Id ); sb.Append( " CorrelationID: " ); sb.Append( msg.CorrelationId ); sb.Append( " Body Type: " ); sb.Append( msg.BodyType ); } catch ( Exception ) { throw; } finally { System.Console.WriteLine( sb.ToString() ); } */ //System.Console.WriteLine( "Receiving Message started" ); using ( Message message = messageQueue.ReceiveById( msg.Id ) ) { //System.Console.WriteLine( "Receiving Message Complete" ); //sb = new StringBuilder(); string bodyText = string.Empty; try { System.IO.StringWriter sw = new System.IO.StringWriter( sb ); System.IO.StreamReader sr = new System.IO.StreamReader( message.BodyStream ); while ( !sr.EndOfStream ) { sw.WriteLine( sr.ReadLine() ); } sr.Close(); sw.Close(); bodyText = ( string ) FromXml( sb.ToString(), typeof( string ) ); int indx = bodyText.IndexOf( ',' ); string tokens = bodyText.Substring( indx + 1 ); indx = tokens.IndexOf( ',' ); string command = tokens.Substring( 0, indx ); tokens = tokens.Substring( indx + 1 ); if ( command.Equals( COMMAND_STARTED ) ) { System.Console.WriteLine( "STARTED " + tokens ); } else if ( command.Equals( COMMAND_UPDATE ) ) { System.Console.WriteLine( tokens ); } else if ( command.Equals( COMMAND_ENDED_OK ) ) { System.Console.WriteLine( tokens ); System.Console.WriteLine( "WEBJOB: Success" ); finalResults = new FinalResults( 0, 0, "Success" ); running = false; } else if ( command.Equals( COMMAND_ENDED_WARNING ) ) { System.Console.WriteLine( tokens ); System.Console.WriteLine( "WEBJOB: Warning Issued" ); finalResults = new FinalResults( 1, 1, "Warning" ); running = false; } else if ( command.Equals( COMMAND_ENDED_FAIL ) ) { System.Console.WriteLine( tokens ); System.Console.WriteLine( "WEBJOB: Failure" ); finalResults = new FinalResults( 2, 16, "Failure" ); running = false; } } catch ( Exception ) { throw; } finally { //System.Console.WriteLine( "Body: " + bodyText ); } } } } } } return finalResults; } MessageQueue messageQueue = null; string webServiceURL = ""; Dictionary<string, string> parms = new Dictionary<string, string>(); string jobid = "NONE"; 
+4
source share
3 answers

kprobst explains what happens. Even if you see that this particular message is in the queue, if another application (or another instance of the same application) selects (any) message from this queue, this will invalidate the cursor.

Actually, this code is not intended to work if several processes are loaded from the same queue.

+3
source

This usually means that the message you receive () is deleted by something else before the receive operation can be completed. Another application or another thread in the same process as your code, using a different queue link.

Is it possible that you can simultaneously run two instances of the processor code (I think this is a console application)? On the same or different machines? Or some other application or tool that removes messages from the queue?

There used to be a bug in one of the preliminary versions of .NET 2.0 that would cause this under some stressful conditions, but as far as I remember, it was fixed before they were sent.

+3
source

This does not work due to the concurrency problem in the internal ReceiveCurrent MessageQueue method. The exception stack trace shows the call that occurred in the enumerator. The current row and exception occurred in ReceiveCurrent. Enumerator.Current calls ReceiveCurrent with the "peek" option. You may ask, what I also had when I came across the same problem, how can peek with the error "Message already received"? Is he just trying to peek into the next message that has not yet been received? The answer lies in the ReceiveCurrent code, which is available for viewing here: https://referencesource.microsoft.com/#System.Messaging/System/Messaging/MessageQueue.cs,02c33cc512659fd7,references

ReceiveCurrent first calls a StaleSafeReceive call to view the next message. But if this call returns that it takes more memory to receive the whole message (line with "while (MessageQueue.IsMemoryError (status)" in its source code), it allocates the necessary memory and makes another call to StaleSafeReceive to receive the message. This is a very classic pattern use of the Win32 API because it ends up being C.

The problem is that if another process or thread is running between the first and second calls of StaleSafeReceive inside ReceiveCurrent, that is, it removes this message from the queue, the second call causes this exact exception. And so the operation "look". Please note that this can be any message that is checked by the enumerator, throwing an exception, and not the message that it is looking for. This explains why a message with this job ID still exists inside the queue after the exception is thrown and the method fails.

What can be done is to protect the enumerator.Current call with try catch and, if that particular exception is caught, just continue the listing with the next available message in the queue.

I used a Cursor object, not an enumerator, but it ran into the same problem. But using the cursor there is another way to reduce the risk of this event, that is, when scanning / viewing a message, you need to disable all unnecessary properties of the current Queue MessagePropertyFilter object, especially the Body property. Since the body usually does not need to be taken during prying, but most often the body of the message causes the memory to be reallocated and requires a second call to StaleSafeReceive inside ReceiveCurrent. However, a catch attempt for this exception is also needed when using the cursor directly with peek.

+1
source

Source: https://habr.com/ru/post/1345002/


All Articles