Common queue in C ++

I just get packets from the network and inject them into one thread, and then consume these packets (Dequeue) in another thread.

So, I decided to use an additional library to create a shared queue based on https://www.quantnet.com/cplusplus-multithreading-boost/

template <typename T> class SynchronisedQueue { private: std::queue<T> m_queue; // Use STL queue to store data boost::mutex m_mutex; // The mutex to synchronise on boost::condition_variable m_cond;// The condition to wait for public: // Add data to the queue and notify others void Enqueue(const T& data) { // Acquire lock on the queue boost::unique_lock<boost::mutex> lock(m_mutex); // Add the data to the queue m_queue.push(data); // Notify others that data is ready m_cond.notify_one(); } // Lock is automatically released here // Get data from the queue. Wait for data if not available T Dequeue() { // Acquire lock on the queue boost::unique_lock<boost::mutex> lock(m_mutex); // When there is no data, wait till someone fills it. // Lock is automatically released in the wait and obtained // again after the wait while (m_queue.size()==0) m_cond.wait(lock); // Retrieve the data from the queue T result=m_queue.front(); m_queue.pop(); return result; } // Lock is automatically released here }; 

The problem is that, without receiving any data, the Dequeue () method blocks my consumer chain, and when I want to stop using the consumer thread I cannot finish it or stop it sometimes.

What is the suggested way to break the Dequeue () lock so that I can safely terminate the thread that is consuming packets? Any ideas?

PS: website https://www.quantnet.com/cplusplus-multithreading-boost/ use "boost :: this_thread :: interruption_point ();" to stop the consumer flow ... Due to my outdated code structure this is not possible for me ...

Based on the answer, I am updating Shared Queue as follows:

 #include <queue> #include <boost/thread.hpp> template <typename T> class SynchronisedQueue { public: SynchronisedQueue() { RequestToEnd = false; EnqueueData = true; } void Enqueue(const T& data) { boost::unique_lock<boost::mutex> lock(m_mutex); if(EnqueueData) { m_queue.push(data); m_cond.notify_one(); } } bool TryDequeue(T& result) { boost::unique_lock<boost::mutex> lock(m_mutex); while (m_queue.empty() && (! RequestToEnd)) { m_cond.wait(lock); } if( RequestToEnd ) { DoEndActions(); return false; } result= m_queue.front(); m_queue.pop(); return true; } void StopQueue() { RequestToEnd = true; Enqueue(NULL); } int Size() { boost::unique_lock<boost::mutex> lock(m_mutex); return m_queue.size(); } private: void DoEndActions() { EnqueueData = false; while (!m_queue.empty()) { m_queue.pop(); } } std::queue<T> m_queue; // Use STL queue to store data boost::mutex m_mutex; // The mutex to synchronise on boost::condition_variable m_cond; // The condition to wait for bool RequestToEnd; bool EnqueueData; }; 

And here is my test drive:

 #include <iostream> #include <string> #include "SynchronisedQueue.h" using namespace std; SynchronisedQueue<int> MyQueue; void InsertToQueue() { int i= 0; while(true) { MyQueue.Enqueue(++i); } } void ConsumeFromQueue() { while(true) { int number; cout << "Now try to dequeue" << endl; bool success = MyQueue.TryDequeue(number); if(success) { cout << "value is " << number << endl; } else { cout << " queue is stopped" << endl; break; } } cout << "Que size is : " << MyQueue.Size() << endl; } int main() { cout << "Test Started" << endl; boost::thread startInsertIntoQueue = boost::thread(InsertToQueue); boost::thread consumeFromQueue = boost::thread(ConsumeFromQueue); boost::this_thread::sleep(boost::posix_time::seconds(5)); //After 5 seconds MyQueue.StopQueue(); int endMain; cin >> endMain; return 0; } 

At the moment it works ... Based on new offers:

i changes the Stop method as:

 void StopQueue() { boost::unique_lock<boost::mutex> lock(m_mutex); RequestToEnd = true; m_cond.notify_one(); } 
+6
source share
3 answers

2 simple solutions to end the stream:

  • send the final message to the queue.
  • add another condition to the condition variable to complete the command

     while(queue.empty() && (! RequestToEnd)) m_cond.wait(lock); if (RequestToEnd) { doEndActions(); } else { T result=m_queue.front(); m_queue.pop(); return result; } 
+4
source

First, do you really need to stop the flow? If not, do not do this.

If you need it, just queue up his suicide pill. Usually I send NULL to T. The thread checks T and, if NULL, clears, returns and so dies.

In addition, you may need to clear the queue first by deleting and deleting () all elements.

+2
source

Another option to consider is not to infinitely block threads. In other words, add a timeout to your blocking calls, for example:

  bool TryDequeue(T& result, boost::chrono::milliseconds timeout) { boost::unique_lock<boost::mutex> lock(m_mutex); boost::chrono::system_clock::time_point timeLimit = boost::chrono::system_clock::now() + timeout; while (m_queue.empty()) { if (m_cond.wait_until(lock, timeLimit) == boost::condition_variable::cv_status::timeout) { return false; } } result = m_queue.front(); m_queue.pop(); return true; } 

Then in your stream just specify a variable to indicate if the stream is working (I took the liberty to make your consumer into a class):

 class Consumer { public: boost::shared_ptr<Consumer> createConsumer() { boost::shared_ptr<Consumer> ret(new Consumer()); ret->_consumeFromQueue = boost::thread(&Consumer::ConsumeFromQueue, ret.get()); return ret; } protected: Consumer() : _threadRunning(true) { } ~Consumer() { _threadRunning = false; _consumeFromQueue.join(); } void ConsumeFromQueue() { while(_threadRunning == true) { int number; cout << "Now try to dequeue" << endl; bool success = MyQueue.TryDequeue(number); if(success) { cout << "value is " << number << endl; } else { cout << " queue is stopped" << endl; break; } } cout << "Que size is : " << MyQueue.Size() << endl; } bool _threadRunning; boost::thread _consumeFromQueue; } 

There is no need to hack your queue class so that it can be used in a thread, give it normal interfaces with timeouts, and then use it appropriately based on usage.

I will talk in more detail about why this is a good template for threads here:

http://blog.chrisd.info/how-to-run-threads/

0
source

Source: https://habr.com/ru/post/913122/


All Articles