Request for implementing a simple C ++ threadpool implementation

Stackoverflow helped me a lot and I had to give something back to the community. I am using a simple threadpool using the TinyThread ++ C ++ website - a portable thread library using what I learned from Stackoverflow. I am new to thread programming, so it’s not so convenient with mutexes, etc. I have a question that was best asked after presenting the code (which works well under Linux):

// ThreadPool.h class ThreadPool { public: ThreadPool(); ~ThreadPool(); // Creates a pool of threads and gets them ready to be used void CreateThreads(int numOfThreads); // Assigns a job to a thread in the pool, but doesn't start the job // Each SubmitJob call will use up one thread of the pool. // This operation can only be undone by calling StartJobs and // then waiting for the jobs to complete. On completion, // new jobs may be submitted. void SubmitJob( void (*workFunc)(void *), void *workData ); // Begins execution of all the jobs in the pool. void StartJobs(); // Waits until all jobs have completed. // The wait will block the caller. // On completion, new jobs may be submitted. void WaitForJobsToComplete(); private: enum typeOfWorkEnum { e_work, e_quit }; class ThreadData { public: bool ready; // thread has been created and is ready for work bool haveWorkToDo; typeOfWorkEnum typeOfWork; // Pointer to the work function each thread has to call. void (*workFunc)(void *); // Pointer to work data void *workData; ThreadData() : ready(false), haveWorkToDo(false) { }; }; struct ThreadArgStruct { ThreadPool *threadPoolInstance; int threadId; }; // Data for each thread ThreadData *m_ThreadData; ThreadPool(ThreadPool const&); // copy ctor hidden ThreadPool& operator=(ThreadPool const&); // assign op. hidden // Static function that provides the function pointer that a thread can call // By including the ThreadPool instance in the void * parameter, // we can use it to access other data and methods in the ThreadPool instance. static void ThreadFuncWrapper(void *arg) { ThreadArgStruct *threadArg = static_cast<ThreadArgStruct *>(arg); threadArg->threadPoolInstance->ThreadFunc(threadArg->threadId); } // The function each thread calls void ThreadFunc( int threadId ); // Called by the thread pool destructor void DestroyThreadPool(); // Total number of threads available // (fixed on creation of thread pool) int m_numOfThreads; int m_NumOfThreadsDoingWork; int m_NumOfThreadsGivenJobs; // List of threads std::vector<tthread::thread *> m_ThreadList; // Condition variable to signal each thread has been created and executing tthread::mutex m_ThreadReady_mutex; tthread::condition_variable m_ThreadReady_condvar; // Condition variable to signal each thread to start work tthread::mutex m_WorkToDo_mutex; tthread::condition_variable m_WorkToDo_condvar; // Condition variable to signal the main thread that // all threads in the pool have completed their work tthread::mutex m_WorkCompleted_mutex; tthread::condition_variable m_WorkCompleted_condvar; }; 

Cpp file:

 // // ThreadPool.cpp // #include "ThreadPool.h" // This is the thread function for each thread. // All threads remain in this function until // they are asked to quit, which only happens // when terminating the thread pool. void ThreadPool::ThreadFunc( int threadId ) { ThreadData *myThreadData = &m_ThreadData[threadId]; std::cout << "Hello world: Thread " << threadId << std::endl; // Signal that this thread is ready m_ThreadReady_mutex.lock(); myThreadData->ready = true; m_ThreadReady_condvar.notify_one(); // notify the main thread m_ThreadReady_mutex.unlock(); while(true) { //tthread::lock_guard<tthread::mutex> guard(m); m_WorkToDo_mutex.lock(); while(!myThreadData->haveWorkToDo) // check for work to do m_WorkToDo_condvar.wait(m_WorkToDo_mutex); // if no work, wait here myThreadData->haveWorkToDo = false; // need to do this before unlocking the mutex m_WorkToDo_mutex.unlock(); // Do the work switch(myThreadData->typeOfWork) { case e_work: std::cout << "Thread " << threadId << ": Woken with work to do\n"; // Do work myThreadData->workFunc(myThreadData->workData); std::cout << "#Thread " << threadId << ": Work is completed\n"; break; case e_quit: std::cout << "Thread " << threadId << ": Asked to quit\n"; return; // ends the thread } // Now to signal the main thread that my work is completed m_WorkCompleted_mutex.lock(); m_NumOfThreadsDoingWork--; // Unsure if this 'if' would make the program more efficient // if(m_NumOfThreadsDoingWork == 0) m_WorkCompleted_condvar.notify_one(); // notify the main thread m_WorkCompleted_mutex.unlock(); } } ThreadPool::ThreadPool() { m_numOfThreads = 0; m_NumOfThreadsDoingWork = 0; m_NumOfThreadsGivenJobs = 0; } ThreadPool::~ThreadPool() { if(m_numOfThreads) { DestroyThreadPool(); delete [] m_ThreadData; } } void ThreadPool::CreateThreads(int numOfThreads) { // Check if a thread pool has already been created if(m_numOfThreads > 0) return; m_NumOfThreadsGivenJobs = 0; m_NumOfThreadsDoingWork = 0; m_numOfThreads = numOfThreads; m_ThreadData = new ThreadData[m_numOfThreads]; ThreadArgStruct threadArg; for(int i=0; i<m_numOfThreads; ++i) { threadArg.threadId = i; threadArg.threadPoolInstance = this; // Creates the thread and saves it in a list so we can destroy it later m_ThreadList.push_back( new tthread::thread( ThreadFuncWrapper, (void *)&threadArg ) ); // It takes a little time for a thread to get established. // Best wait until it gets established before creating the next thread. m_ThreadReady_mutex.lock(); while(!m_ThreadData[i].ready) // Check if thread is ready m_ThreadReady_condvar.wait(m_ThreadReady_mutex); // If not, wait here m_ThreadReady_mutex.unlock(); } } // Assigns a job to a thread, but doesn't start the job void ThreadPool::SubmitJob(void (*workFunc)(void *), void *workData) { // Check if the thread pool has been created if(!m_numOfThreads) return; if(m_NumOfThreadsGivenJobs >= m_numOfThreads) return; m_ThreadData[m_NumOfThreadsGivenJobs].workFunc = workFunc; m_ThreadData[m_NumOfThreadsGivenJobs].workData = workData; std::cout << "Submitted job " << m_NumOfThreadsGivenJobs << std::endl; m_NumOfThreadsGivenJobs++; } void ThreadPool::StartJobs() { // Check that the thread pool has been created // and some jobs have been assigned if(!m_numOfThreads || !m_NumOfThreadsGivenJobs) return; // Set 'haveworkToDo' flag for all threads m_WorkToDo_mutex.lock(); for(int i=0; i<m_NumOfThreadsGivenJobs; ++i) { m_ThreadData[i].typeOfWork = e_work; // forgot to do this ! m_ThreadData[i].haveWorkToDo = true; } m_NumOfThreadsDoingWork = m_NumOfThreadsGivenJobs; // Reset this counter so we can resubmit jobs later m_NumOfThreadsGivenJobs = 0; // Notify all threads they have work to do m_WorkToDo_condvar.notify_all(); m_WorkToDo_mutex.unlock(); } void ThreadPool::WaitForJobsToComplete() { // Check that a thread pool has been created if(!m_numOfThreads) return; m_WorkCompleted_mutex.lock(); while(m_NumOfThreadsDoingWork > 0) // Check if all threads have completed their work m_WorkCompleted_condvar.wait(m_WorkCompleted_mutex); // If not, wait here m_WorkCompleted_mutex.unlock(); } void ThreadPool::DestroyThreadPool() { std::cout << "Ask threads to quit\n"; m_WorkToDo_mutex.lock(); for(int i=0; i<m_numOfThreads; ++i) { m_ThreadData[i].haveWorkToDo = true; m_ThreadData[i].typeOfWork = e_quit; } m_WorkToDo_condvar.notify_all(); m_WorkToDo_mutex.unlock(); // As each thread terminates, catch them here for(int i=0; i<m_numOfThreads; ++i) { tthread::thread *t = m_ThreadList[i]; // Wait for thread to complete t->join(); } m_numOfThreads = 0; } 

Usage example: (this calculates the pi-square / 6 by summing the inverse squares) In fact, this usage example performs the same calculation 10 times in parallel. A more practical use would be for each thread to compute a different set of summed members. The final result is then obtained by adding all the results of the stream after completion of the pool job.

 struct CalculationDataStruct { int inputVal; double outputVal; }; void LongCalculation( void *theSums ) { CalculationDataStruct *sums = (CalculationDataStruct *)theSums; int terms = sums->inputVal; double sum; for(int i=1; i<terms; i++) sum += 1.0/( double(i)*double(i) ); sums->outputVal = sum; } int main(int argc, char** argv) { int numThreads = 10; // Create pool ThreadPool threadPool; threadPool.CreateThreads(numThreads); // Create thread workspace CalculationDataStruct sums[numThreads]; // Set up jobs for(int i=0; i<numThreads; i++) { sums[i].inputVal = 3000*(i+1); threadPool.SubmitJob(LongCalculation, &sums[i]); } // Run the jobs threadPool.StartJobs(); threadPool.WaitForJobsToComplete(); // Print results for(int i=0; i<numThreads; i++) std::cout << "Sum of " << sums[i].inputVal << " terms is " << sums[i].outputVal << std::endl; return 0; } 

Question: In the ThreadPool :: ThreadFunc method, it is better to get performance if the following if statement

 if(NumOfThreadsDoingWork == 0) 

was on? In addition, I would be grateful for the criticism and ways to improve the code. At the same time, I hope the code will be useful to others.

+6
source share
1 answer

Firstly, you can look in C ++ 11 " std :: thread " and "std :: mutex". You can also explore Intel's " Threading Building Blocks ", which provides several templates for distributing work. For a portable, cross-platform, encapsulated C ++ API, I usually used the OpenThreads library . Finally, you can create scalable distributed workloads without mutexes using a messaging library such as ZeroMQ .

Looking at your current code, my biggest problem will be that you are not blocking the variables used to assign work to threads; I assume that since you separated SubmitJob and StartWork.

But ultimately, your ThreadPool is not thread safe.

It is also a bit complicated API with job types, etc. You probably need to ignore the concept of "work." Here is an example where I did this; you probably want to encapsulate the bulk of the code back into the ThreadPool class; also the completion method (NULL job) is artificial, you probably want to use pthread_cancel, but this demo showed well.

 #include <queue> #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> static int jobNo = 0; class Job { public: Job() : m_i(++jobNo) { printf("Created job %d.\n", m_i); } int m_i; void Execute() { printf("Job %d executing.\n", m_i); usleep(500 * 1000); } }; std::queue<Job*> queue; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; void AddJob(Job* job) { pthread_mutex_lock(&mutex); queue.push(job); pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); } void* QueueWorker(void* /*threadInfo*/) { Job* job = NULL; for (;;) { pthread_mutex_lock(&mutex); while ( queue.empty() ) { // unlock the mutex until the cond is signal()d or broadcast() to. // if this call succeeds, we will have the mutex locked again on the other side. pthread_cond_wait(&cond, &mutex); } // take the first task and then release the lock. job = queue.front(); queue.pop(); pthread_mutex_unlock(&mutex); if ( job == NULL ) { // in this demonstration, NULL ends the run, so forward to any other threads. AddJob(NULL); break; } job->Execute(); delete job; } return NULL; } int main(int argc, const char* argv[]) { pthread_t worker1, worker2; pthread_create(&worker1, NULL, &QueueWorker, NULL); pthread_create(&worker2, NULL, &QueueWorker, NULL); srand(time(NULL)); // queue 5 jobs with delays. for ( size_t i = 0; i < 5; ++i ) { long delay = (rand() % 800) * 1000; printf("Producer sleeping %fs\n", (float)delay / (1000*1000)); usleep(delay); Job* job = new Job(); AddJob(job); } // 5 more without delays. for ( size_t i = 0; i < 5; ++i ) { AddJob(new Job); } // null to end the run. AddJob(NULL); printf("Done with jobs.\n"); pthread_join(worker1, NULL); pthread_join(worker2, NULL); return 0; } 
+1
source

Source: https://habr.com/ru/post/919186/


All Articles