Windows C ++ thread waiting for a data queue

My program is configured as follows:
There is a thread-safe queue class, one thread pushes data on it while sitting in an infinite loop, and the second thread issues data from it, sitting in an infinite loop. I am trying to think of a way to use Windows events or some other mechanism to do thread_1 (below), wait in an infinite while loop and only iterate when the queue depth is greater than or equal to 1.

class thread-safe_Queue { public: push(); pop(); }; DWORD thread_1() { while(1) { // wait for thread-safe queue to have data on it // pop data off // process data } } DWORD thread_2() { while(1) { // when data becomes available, push data onto thread-safe queue } } 
+6
source share
3 answers

I think this can do the trick. Derive the Event class and overload the Process () function.

 #include <process.h> // Along with all the normal windows includes //********************************************* using namespace os; Mutex globalQueueMutex; class QueueReader : public Event { public: virtual void Process() { // Lock the queue Locker l(globalQueueMutex); // pop data off // process data return; // queue will automatically unlock } }; QueueReader myQueueReader; //********************************************* // The queue writer would have functions like : void StartQueueReader() { Thread(QueueReader::StartEventHandler, &myQueueReader); } void WriteToQueue() { Locker l(globalQueueMutex); // write to the queue myQueueReader.SignalProcess(); // tell reader to wake up } // When want to shutdown void Shutdown() { myQueueReader.SignalShutdown(); } 

Here are the classes that do magic.

 namespace os { // ********************************************************************** /// Windows implementation to spawn a thread. static uintptr_t Thread (void (*StartAddress)(void *), void *ArgList) { return _beginthread(StartAddress, 0, ArgList); } // ********************************************************************** /// Windows implementation of a critical section. class Mutex { public: // Initialize section on construction Mutex() { InitializeCriticalSection( &cs_ ); } // Delete section on destruction ~Mutex() { DeleteCriticalSection( &cs_ ); } // Lock it void lock() { EnterCriticalSection( &cs_ ); } // Unlock it void unlock() { LeaveCriticalSection( &cs_ ); } private: CRITICAL_SECTION cs_; }; // class Mutex /// Locks/Unlocks a mutex class Locker { public: // Lock the mutex on construction Locker( Mutex& mutex ): mutex_( mutex ) { mutex_.lock(); } // Unlock on destruction ~Locker() { mutex_.unlock(); } private: Mutex& mutex_; }; // class Locker // ********************************************************************** // Windows implementation of event handler #define ProcessEvent hEvents[0] #define SetTimerEvent hEvents[1] #define ShutdownEvent hEvents[2] /// Windows implementation of events class Event { /// Flag set when shutdown is complete bool Shutdown; /// Max time to wait for events DWORD Timer; /// The three events - process, reset timer, and shutdown HANDLE hEvents[3]; public: /// Timeout is disabled by default and Events assigned Event( DWORD timer = INFINITE) : Timer(timer) { Shutdown = false; ProcessEvent = CreateEvent( NULL,TRUE,FALSE,NULL ); SetTimerEvent = CreateEvent( NULL,TRUE,FALSE,NULL ); ShutdownEvent = CreateEvent( NULL,TRUE,FALSE,NULL ); } /// Close the event handles virtual ~Event() { CloseHandle(ProcessEvent); CloseHandle(SetTimerEvent); CloseHandle(ShutdownEvent); } /// os::Thread calls this to start the Event handler static void StartEventHandler(void *pMyInstance) { ((Event *)pMyInstance)->EventHandler(); } /// Call here to Change/Reset the timeout timer void ResetTimer(DWORD timer) { Timer = timer; SetEvent(SetTimerEvent); } /// Set the signal to shutdown the worker thread processing events void SignalShutdown() { SetEvent(ShutdownEvent); while (!Shutdown) Sleep(30);} /// Set the signal to run the process void SignalProcess() { SetEvent(ProcessEvent); } protected: /// Overload in derived class to process events with worker thread virtual void Process(){} /// Override to process timeout- return true to terminate thread virtual bool Timeout(){ return true;} /// Monitor thread events void EventHandler() { DWORD WaitEvents; while (!Shutdown) { // Wait here, looking to be signaled what to do next WaitEvents = WaitForMultipleObjects(3, hEvents, FALSE, Timer); switch (WaitEvents) { // Process event - process event then reset for the next one case WAIT_OBJECT_0 + 0: Process(); ResetEvent(ProcessEvent); break; // Change timer event - see ResetTimer(DWORD timer) case WAIT_OBJECT_0 + 1: ResetEvent(SetTimerEvent); continue; // Shutdown requested so exit this thread case WAIT_OBJECT_0 + 2: Shutdown = true; break; // Timed out waiting for an event case WAIT_TIMEOUT: Shutdown = Timeout(); break; // Failed - should never happen case WAIT_FAILED: break; default: break; } } } }; } // namespace os 
+2
source

How about this (I suppose you are familiar with the mechanism of events).

1.

 thread_safe_Queue::push(something) { // lock the queue ... // push object // Signal the event SetEvent(notification); // unlock the queue } 

2.

 thread_safe_Queue::pop(something) { WaitForSingleObject(notification); // lock the queue ... // get object // reset the event if (queue is empty) ResetEvent(notification); // unlock the queue } 

3. thread_1 is simply trying to open the object and process it. When something is pressed, the event is on, so pop can be successfully called. Otherwise, it will wait inside the pop . In fact, you can use other synchronization objects, such as mutexes or critical sections, instead of events in this case.

UPDATE External events: thread 1:

 void thread_1() { while(1) { WaitForSingleObject(notification); if (!pop(object)) // pop should return if there are any objects left in queue SetEvent(notification); } } 

thread_2

 void thread_2() { while(1) { // push the object and than signal event ResetEvent(notification) } } 
0
source

You can use named events. Each thread will call CreateEvent passing with the same name. Then use WaitForMultipleObjects to wait for the event associated with the event or event of the final program. The pop stream will wait for the queue_has_data and end_program events. The push thread will wait for the data_available and end_program events and set the queue_has_data event when it puts something in the queue.

0
source

Source: https://habr.com/ru/post/896986/


All Articles