Secure message queue with multiple threads

Here is what I basically have:

I have thread A that periodically checks messages and processes them.

Threads B and C must send messages A.

The problem occurs when B and C or B or C try to send message A while A processes the message and thus accesses the queue.

How is this problem usually resolved?

thanks

+6
source share
2 answers

This is usually resolved using mutexes or other multi-threaded protection mechanisms.

If you are working on windows, MFC provides the CMutex class for this problem.

If you are working on a posix system, the posix api provides the functions pthread_mutex_lock , pthread_mutex_unlock and pthread_mutex_trylock .

Some basic pseudo-code will be convenient for demonstrating their use in your case:

 pthread_mutex_t mutex; *or* CMutex mutex; Q queue; // <-- both mutex and queue are global state, whether they are // global variables, or passed in as parameters, they must // be the shared by all threads. int threadA(/* params */){ while( threadAStillRunning ){ // perform some non-critical actions ... pthread_mutex_lock(mutex) *or* mutex.Lock() // perform critical actions ... msg = queue.receiveMessage() pthread_mutex_unlock(mutex) *or* mutex.Unlock() // perform more non-critical actions } } int threadBorC(/* params */){ while( theadBorCStillRunning ){ // perform some non-critical actions ... pthread_mutex_lock(mutex) *or* mutex.Lock() // perform critical actions ... queue.sendMessage(a_msg) pthread_mutex_unlock(mutex) *or* mutex.Unlock() } } 

For all three threads, their ability to queue depends on their ability to acquire a mutex — they simply block and wait until the mutex is acquired. This prevents conflicts arising from the use of this resource.

+4
source

If you are not in windows or using something that is cross platform in C ++, try using Queue from the ACE libraries.

 ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue; 

As a sample from ACE library samples, you can use To place a message in a queue:

  ACE_NEW_RETURN (mb, ACE_Message_Block (rb.size (), ACE_Message_Block::MB_DATA, 0, buffer), 0); mb->msg_priority (ACE_Utils::truncate_cast<unsigned long> (rb.size ())); mb->wr_ptr (rb.size ()); ACE_DEBUG ((LM_DEBUG, "enqueueing message of size %d\n", mb->msg_priority ())); // Enqueue in priority order. if (msg_queue->enqueue_prio (mb) == -1) ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); 

to get from the queue:

  ACE_Message_Block *mb = 0; msg_queue->dequeue_head (mb) == -1; int length = ACE_Utils::truncate_cast<int> (mb->length ()); if (length > 0) ACE_OS::puts (mb->rd_ptr ()); // Free up the buffer memory and the Message_Block. ACE_Allocator::instance ()->free (mb->rd_ptr ()); mb->release (); 

The advantage is that you can easily change the synchronization primitive without requiring too much code.

0
source

Source: https://habr.com/ru/post/899385/


All Articles