Avoiding recursive template instance overflow in parallel recursive asynchronous algorithms

This problem is easier to explain with a simplified example (since my real situation is far from being “minimal”): considering ...

template <typename T>
void post_in_thread_pool(T&& f) 

... function template, I want to create a parallel asynchronous algorithm that has a tree-like recursive structure. I am going to write an example of the structure below, using std::count_ifas a placeholder. The strategy I'm going to use is as follows:

  • If the length of the range that I am checking is less than 64, I will return to the sequential function std::count_if. (0)

  • If it is greater than or equal 64, I will create a task in the thread pool that will return to the left half of the range and calculate the right half of the range of the current thread. (one)

    • I will use the atomic total intfor "waiting" to calculate two halves. (2)

    • I will use atomic total intto accumulate partial results. (3)

Simplified code:

auto async_count_if(auto begin, auto end, auto predicate, auto continuation)
{
    // (0) Base case:  
    if(end - begin < 64)
    {
        continuation(std::count_if(begin, end, predicate));
        return;
    }

    // (1) Recursive case:
    auto counter = make_shared<atomic<int>>(2); // (2)
    auto cleanup = [=, accumulator = make_shared<atomic<int>>(0) /*(3)*/]
                   (int partial_result)
    {
        *accumulator += partial_result; 

        if(--*counter == 0)
        {
            continuation(*accumulator);
        }
    };

    const auto mid = std::next(i_begin, sz / 2);                

    post_in_thread_pool([=]
    {
        async_count_if(i_begin, mid, predicate, cleanup);
    });

    async_count_if(mid, i_end, predicate, cleanup);
}

Then the code can be used as follows:

std::vector<int> v(512);
std::iota(std::begin(v), std::end(v), 0);

async_count_if{}(std::begin(v), std::end(v), 
/*    predicate */ [](auto x){ return x < 256; }, 
/* continuation */ [](auto res){ std::cout << res << std::endl; });

The problem is in the above code auto cleanup. Since it autowill be displayed on a unique type for each cleanuplambda instance , and since the cleanupcapture contby value ... an infinitely large nested lambda type will be computed at compile time due to recursion, resulting in the following error:

: 1024

wandbox

, , :

cont                                // user-provided continuation
cleanup0<cont>                      // recursive step 0
cleanup1<cleanup0<cont>>            // recursive step 1
cleanup2<cleanup1<cleanup0<cont>>>  // recursive step 2
// ...

(!): , async_count_if - , "" . , count_if sz / 64 .


, .

  • std::function<void(int)> cleanup, , . wandbox

    • std::size_t + async_count_if::operator() - , , .

, , async_count_if: std::distance(i_begin, i_end). , : (2^k - 1), k - .

, " " async_count_if . " " (2^k - 1) (2^k - 1) /.

, , , , , .

, ?

+6
3

- , ? ( ) , a la ( ):

#include <algorithm>
#include <memory>
#include <vector>
#include <iostream>
#include <numeric>
#include <future>

using namespace std;

template <class T>
auto post_in_thread_pool(T&& work)
{
    std::async(std::launch::async, work);
}

template <class It, class Pred, class Cont>
auto async_count_if(It begin, It end, Pred predicate, Cont continuation)
{
    // (0) Base case:  
    if(end - begin <= 64)
    {
        continuation(std::count_if(begin, end, predicate));
        return;
    }

    const auto sz = std::distance(begin, end);
    const auto mid = std::next(begin, sz / 2);                

    post_in_thread_pool([=]
    {
         async_count_if(begin, mid, predicate, continuation);
    });

    async_count_if(mid, end, predicate, continuation);
}

template <class It, class Pred, class Cont>
auto async_count_if_facade(It begin, It end, Pred predicate, Cont continuation)
{
    // (1) Recursive case:
    const auto sz = std::distance(begin, end);
    auto counter = make_shared<atomic<int>>(sz / 64); // (fix this for mod 64 !=0 cases)
    auto cleanup = [=, accumulator = make_shared<atomic<int>>(0) /*(3)*/]
                   (int partial_result)
    {
        *accumulator += partial_result; 

        if(--*counter == 0)
        {
            continuation(*accumulator);
        }
    };

    return async_count_if(begin, end, predicate, cleanup);
}

int main ()
{
    std::vector<int> v(1024);
    std::iota(std::begin(v), std::end(v), 0);

    async_count_if_facade(std::begin(v), std::end(v), 
    /*    predicate */ [](auto x){ return x > 1000; }, 
    /* continuation */ [](const auto& res){ std::cout << res << std::endl; });
}

+2

. . .

.

template<class T>
auto sink_into_pointer( T* target ) {
  return [target](T x){*target=x;};
}
template<class T>
auto sink_into_promise( std::promise<T>& p ) {
  return [&p](T x){p.set_value(x);};
}
void async_count_if(auto begin, auto end, auto predicate, auto continuation) {
  // (0) Base case:  
  if(end - begin < 64)
  {
    continuation(std::count_if(begin, end, std::move(predicate)));
    return;
  }

  std::promise< int > sub_count;
  auto sub_count_value = sub_count.get_future();

  auto sub_count_task = sink_into_promise(sub_count);
  // (1) Recursive case:
  const auto mid = std::next(i_begin, sz / 2);        

  post_in_thread_pool(
    [sub_count_task]()mutable
    {
      async_count_if(i_begin, mid, predicate, sub_count_task);
    }
  );

  int second_half = 0;
  auto second_sub_count = sink_into_pointer(&second_half);

  async_count_if(mid, i_end, predicate, second_sub_count);

  continuation( second_half + sub_count_value.get() );
}

- , packaged_task s, .

parallelism , . .. , .

+1

:

#include <algorithm>
#include <future>
#include <iostream>
#include <memory>
#include <numeric>
#include <vector>

using namespace std;

template <class T> auto post_in_thread_pool(T &&work) {
  std::async(std::launch::async, work);
}

template <class Terminal_T> struct Accumulator {
  std::shared_ptr<atomic<int>> counter;
  std::shared_ptr<atomic<int>> accumulator;
  Terminal_T func;
  std::shared_ptr<Accumulator> parent;

  void operator()(int value) {
    *accumulator += value;
    if (--*counter == 0) {
      if (parent)
        (*parent)(*accumulator);
      else
        func(*accumulator);
    }
  }
};

template <class T>
auto make_shared_accumulator(T func, int nb_leaves,
                             std::shared_ptr<Accumulator<T>> parent = nullptr) {
  return make_shared<Accumulator<T>>(
      Accumulator<T>{make_shared<atomic<int>>(nb_leaves),
                     make_shared<atomic<int>>(0), func, parent});
}

template <class Begin_T, class End_T, class Predicate_T, class Continuation_T>
auto async_count_if(Begin_T begin, End_T end, Predicate_T predicate,
                    Continuation_T continuation) {
  auto sz = end - begin;

  // (0) Base case:
  if (sz < 64) {
    (*continuation)(std::count_if(begin, end, predicate));
    return;
  }

  // (1) Recursive case:
  auto counter = make_shared<atomic<int>>(2); // (2)
  auto cleanup = make_shared_accumulator(continuation->func, 2, continuation);
  const auto mid = std::next(begin, sz / 2);

  post_in_thread_pool([=] { async_count_if(begin, mid, predicate, cleanup); });

  async_count_if(mid, end, predicate, cleanup);
}

int main() {
  std::vector<int> v(512);
  std::iota(std::begin(v), std::end(v), 0);

  std::vector<std::future<size_t>> results;

  auto res_func = [](int res) { std::cout << res << std::endl; };
  async_count_if(std::begin(v), std::end(v),
                 /*    predicate */ [](auto x) { return x < 256; },
                 /* continuation */
                 make_shared_accumulator(res_func, 1));
}

On Coliru. , , (, , ), .

, , , , .

. , . , , .

. , , , .

To get out of this trap, you need more tools. You might consider using TensorFlow to implement your computational model. You can also use experimental frameworks such as Boson or RaftLib (multithreading is not yet implemented in this). Or implement your own, but be careful, this is a lot of work to figure it out.

0
source

Source: https://habr.com/ru/post/1013944/


All Articles