Common queue in C ++

I just get packets from the network and inject them into one thread, and then consume these packets (Dequeue) in another thread.

So, I decided to use an additional library to create a shared queue based on

template <typename T> class SynchronisedQueue { private: std::queue<T> m_queue; // Use STL queue to store data boost::mutex m_mutex; // The mutex to synchronise on boost::condition_variable m_cond;// The condition to wait for public: // Add data to the queue and notify others void Enqueue(const T& data) { // Acquire lock on the queue boost::unique_lock<boost::mutex> lock(m_mutex); // Add the data to the queue m_queue.push(data); // Notify others that data is ready m_cond.notify_one(); } // Lock is automatically released here // Get data from the queue. Wait for data if not available T Dequeue() { // Acquire lock on the queue boost::unique_lock<boost::mutex> lock(m_mutex); // When there is no data, wait till someone fills it. // Lock is automatically released in the wait and obtained // again after the wait while (m_queue.size()==0) m_cond.wait(lock); // Retrieve the data from the queue T result=m_queue.front(); m_queue.pop(); return result; } // Lock is automatically released here }; 

The problem is that, without receiving any data, the Dequeue () method blocks my consumer chain, and when I want to stop using the consumer thread I cannot finish it or stop it sometimes.

What is the suggested way to break the Dequeue () lock so that I can safely terminate the thread that is consuming packets? Any ideas?

PS: website use "boost :: this_thread :: interruption_point ();" to stop the consumer flow ... Due to my outdated code structure this is not possible for me ...

Based on the answer, I am updating Shared Queue as follows:

 #include <queue> #include <boost/thread.hpp> template <typename T> class SynchronisedQueue { public: SynchronisedQueue() { RequestToEnd = false; EnqueueData = true; } void Enqueue(const T& data) { boost::unique_lock<boost::mutex> lock(m_mutex); if(EnqueueData) { m_queue.push(data); m_cond.notify_one(); } } bool TryDequeue(T& result) { boost::unique_lock<boost::mutex> lock(m_mutex); while (m_queue.empty() && (! RequestToEnd)) { m_cond.wait(lock); } if( RequestToEnd ) { DoEndActions(); return false; } result= m_queue.front(); m_queue.pop(); return true; } void StopQueue() { RequestToEnd = true; Enqueue(NULL); } int Size() { boost::unique_lock<boost::mutex> lock(m_mutex); return m_queue.size(); } private: void DoEndActions() { EnqueueData = false; while (!m_queue.empty()) { m_queue.pop(); } } std::queue<T> m_queue; // Use STL queue to store data boost::mutex m_mutex; // The mutex to synchronise on boost::condition_variable m_cond; // The condition to wait for bool RequestToEnd; bool EnqueueData; }; 

And here is my test drive:

 #include <iostream> #include <string> #include "SynchronisedQueue.h" using namespace std; SynchronisedQueue<int> MyQueue; void InsertToQueue() { int i= 0; while(true) { MyQueue.Enqueue(++i); } } void ConsumeFromQueue() { while(true) { int number; cout << "Now try to dequeue" << endl; bool success = MyQueue.TryDequeue(number); if(success) { cout << "value is " << number << endl; } else { cout << " queue is stopped" << endl; break; } } cout << "Que size is : " << MyQueue.Size() << endl; } int main() { cout << "Test Started" << endl; boost::thread startInsertIntoQueue = boost::thread(InsertToQueue); boost::thread consumeFromQueue = boost::thread(ConsumeFromQueue); boost::this_thread::sleep(boost::posix_time::seconds(5)); //After 5 seconds MyQueue.StopQueue(); int endMain; cin >> endMain; return 0; } 

At the moment it works ... Based on new offers:

i changes the Stop method as:

 void StopQueue() { boost::unique_lock<boost::mutex> lock(m_mutex); RequestToEnd = true; m_cond.notify_one(); } 
source share
3 answers

2 simple solutions to end the stream:

  • send the final message to the queue.
  • add another condition to the condition variable to complete the command

     while(queue.empty() && (! RequestToEnd)) m_cond.wait(lock); if (RequestToEnd) { doEndActions(); } else { T result=m_queue.front(); m_queue.pop(); return result; } 

First, do you really need to stop the flow? If not, do not do this.

If you need it, just queue up his suicide pill. Usually I send NULL to T. The thread checks T and, if NULL, clears, returns and so dies.

In addition, you may need to clear the queue first by deleting and deleting () all elements.


Another option to consider is not to infinitely block threads. In other words, add a timeout to your blocking calls, for example:

  bool TryDequeue(T& result, boost::chrono::milliseconds timeout) { boost::unique_lock<boost::mutex> lock(m_mutex); boost::chrono::system_clock::time_point timeLimit = boost::chrono::system_clock::now() + timeout; while (m_queue.empty()) { if (m_cond.wait_until(lock, timeLimit) == boost::condition_variable::cv_status::timeout) { return false; } } result = m_queue.front(); m_queue.pop(); return true; } 

Then in your stream just specify a variable to indicate if the stream is working (I took the liberty to make your consumer into a class):

 class Consumer { public: boost::shared_ptr<Consumer> createConsumer() { boost::shared_ptr<Consumer> ret(new Consumer()); ret->_consumeFromQueue = boost::thread(&Consumer::ConsumeFromQueue, ret.get()); return ret; } protected: Consumer() : _threadRunning(true) { } ~Consumer() { _threadRunning = false; _consumeFromQueue.join(); } void ConsumeFromQueue() { while(_threadRunning == true) { int number; cout << "Now try to dequeue" << endl; bool success = MyQueue.TryDequeue(number); if(success) { cout << "value is " << number << endl; } else { cout << " queue is stopped" << endl; break; } } cout << "Que size is : " << MyQueue.Size() << endl; } bool _threadRunning; boost::thread _consumeFromQueue; } 

There is no need to hack your queue class so that it can be used in a thread, give it normal interfaces with timeouts, and then use it appropriately based on usage.

I will talk in more detail about why this is a good template for threads here:



All Articles