I just get packets from the network and inject them into one thread, and then consume these packets (Dequeue) in another thread.
So, I decided to use an additional library to create a shared queue based on https://www.quantnet.com/cplusplus-multithreading-boost/
template <typename T> class SynchronisedQueue { private: std::queue<T> m_queue;
The problem is that, without receiving any data, the Dequeue () method blocks my consumer chain, and when I want to stop using the consumer thread I cannot finish it or stop it sometimes.
What is the suggested way to break the Dequeue () lock so that I can safely terminate the thread that is consuming packets? Any ideas?
PS: website https://www.quantnet.com/cplusplus-multithreading-boost/ use "boost :: this_thread :: interruption_point ();" to stop the consumer flow ... Due to my outdated code structure this is not possible for me ...
Based on the answer, I am updating Shared Queue as follows:
#include <queue> #include <boost/thread.hpp> template <typename T> class SynchronisedQueue { public: SynchronisedQueue() { RequestToEnd = false; EnqueueData = true; } void Enqueue(const T& data) { boost::unique_lock<boost::mutex> lock(m_mutex); if(EnqueueData) { m_queue.push(data); m_cond.notify_one(); } } bool TryDequeue(T& result) { boost::unique_lock<boost::mutex> lock(m_mutex); while (m_queue.empty() && (! RequestToEnd)) { m_cond.wait(lock); } if( RequestToEnd ) { DoEndActions(); return false; } result= m_queue.front(); m_queue.pop(); return true; } void StopQueue() { RequestToEnd = true; Enqueue(NULL); } int Size() { boost::unique_lock<boost::mutex> lock(m_mutex); return m_queue.size(); } private: void DoEndActions() { EnqueueData = false; while (!m_queue.empty()) { m_queue.pop(); } } std::queue<T> m_queue; // Use STL queue to store data boost::mutex m_mutex; // The mutex to synchronise on boost::condition_variable m_cond; // The condition to wait for bool RequestToEnd; bool EnqueueData; };
And here is my test drive:
#include <iostream> #include <string> #include "SynchronisedQueue.h" using namespace std; SynchronisedQueue<int> MyQueue; void InsertToQueue() { int i= 0; while(true) { MyQueue.Enqueue(++i); } } void ConsumeFromQueue() { while(true) { int number; cout << "Now try to dequeue" << endl; bool success = MyQueue.TryDequeue(number); if(success) { cout << "value is " << number << endl; } else { cout << " queue is stopped" << endl; break; } } cout << "Que size is : " << MyQueue.Size() << endl; } int main() { cout << "Test Started" << endl; boost::thread startInsertIntoQueue = boost::thread(InsertToQueue); boost::thread consumeFromQueue = boost::thread(ConsumeFromQueue); boost::this_thread::sleep(boost::posix_time::seconds(5)); //After 5 seconds MyQueue.StopQueue(); int endMain; cin >> endMain; return 0; }
At the moment it works ... Based on new offers:
i changes the Stop method as:
void StopQueue() { boost::unique_lock<boost::mutex> lock(m_mutex); RequestToEnd = true; m_cond.notify_one(); }