C ++ 11 Dynamic Threadpool

Recently, I tried to find a library for stream processing of parallel tasks. Ideally, a simple interface that calls a function in a stream. At any time, there are n number of threads, some of which are faster than others, and arrive at different times.

At first I tried to use Rx, which is great in C ++. I also studied Blocks and TBB, but they are platform dependent. For my prototype, I need to remain independent of the platform, because we do not know what it will work for, and we can change when making decisions.

There are many things in C ++ 11 for streaming processing and concurrency, and I have found some examples like this for thread pools.

https://github.com/bilash/threadpool

Similar projects use the same lambda expressions with std :: thread and std :: mutex.

It looks perfect for what I need. There are some problems. Pools start with a certain number of threads, and tasks are queued until the thread becomes free.

How to add new threads? Delete expired threads? (.join () ??)

Obviously, this is much simpler for a known number of threads, since they can be initialized in ctor and then join () in dtor.

Any tips or pointers from someone who has experience with C ++ concurrency?

+4
source share
1 answer
  • Start with the maximum number of threads that the system can support:

    int Num_Threads =  thread::hardware_concurrency();
    
  • threadpool, Num_Threads, ( ). , , , .

    ++ 11 , , .

    ​​ :

    int Num_Threads = thread::hardware_concurrency();
    vector<thread> Pool;
    for(int ii = 0; ii < Num_Threads; ii++)
    {  Pool.push_back(thread(Infinite_loop_function));}
    
  • Infinite_loop_function

    while (true),

    void The_Pool:: Infinite_loop_function()
    {
        while(true)
        {
            {
                unique_lock<mutex> lock(Queue_Mutex);
    
                condition.wait(lock, []{return !Queue.empty()});
                Job = Queue.front();
                Queue.pop();
            }
            Job(); // function<void()> type
        }
    };
    
  • void The_Pool:: Add_Job(function<void()> New_Job)
    {
        {
            unique_lock<mutex> lock(Queue_Mutex);
            Queue.push(New_Job);
        }
        condition.notify_one();
    }
    
  • Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));
    

. , .

+1

Source: https://habr.com/ru/post/1528876/


All Articles