How to make Intel TBB multifunction_node with dynamic number of ports?

I am new to Intel TBB library . As you can see, my question is related to tbb :: flow :: graph. I need to implement logic, for example:

The user draws a graph using some logical blocks. Each block (node) can have unlimited connections (edges) so that each block (node) can choose where to put the data. Then my program will construct such a graph using the TBB library and perform the calculations.

Therefore, I do not know if it is possible to build a node (I think it should be multifunctional_node) with a dynamic number of output ports. Could you show me a way to do this, please?

+4
source share
1 answer

Unfortunately, there is no way (without dynamic compilation) to change the number of output ports in multifunction_node. You can create the maximum number of ports (which is controlled by a macro switch and depends on the compiler) and simply connect to the ports dynamically. If you use try_put for the port and no successor is connected, try_put fails, and you can respond to this at runtime.

( , ) , _nodes. , node, 0 1, . , .

, 2 (, 10).

: ( :: graph) , , . , :

#include "tbb/tbb.h"
#include <iostream>

using namespace tbb::flow;

tbb::spin_mutex io_lock;
typedef broadcast_node<int> bnode_element_t;
typedef tbb::concurrent_vector<bnode_element_t *> output_port_vector_t;
struct multioutput_function_body {
    output_port_vector_t &my_ports;
    public:
    multioutput_function_body(output_port_vector_t &_ports) : my_ports(_ports) {}
    multioutput_function_body(const multioutput_function_body &other) : my_ports(other.my_ports) { }
    continue_msg operator()(const int in) {
        int current_size = my_ports.size();
        if(in >= current_size) {
            // error condition?  grow concurrent_vector?
            tbb::spin_mutex::scoped_lock gl(io_lock);
            std::cout << "Received input out of range(" << in << ")" << std::endl;
        }
        else {
            // do computation
            my_ports[in]->try_put(in*2);
        }
        return continue_msg();
    }
};

struct output_function_body {
    int my_prefix;
    output_function_body(int i) : my_prefix(i) { }
    int operator()(const int i) {
        tbb::spin_mutex::scoped_lock gl(io_lock);
        std::cout << " output node "<< my_prefix << " received " << i << std::endl;
        return i;
    }
};

int main() {
    graph g;
    output_port_vector_t output_ports;
    function_node<int> my_node(g, unlimited, multioutput_function_body(output_ports) );
    // create broadcast_nodes
    for( int i = 0; i < 20; ++i) {
        bnode_element_t *bp = new bnode_element_t(g);
        output_ports.push_back(bp);
    }

    // attach the output nodes to the broadcast_nodes
    for(int i = 0; i < 20; ++i) {
        function_node<int,int> *fp = new function_node<int,int>(g, unlimited, output_function_body(i));
        make_edge(*(output_ports[i]),*fp);
    }

    for( int i = 0; i < 21; ++i) {
        my_node.try_put(i);
    }
    g.wait_for_all();
    return 0;
}

:

  • concurrent_vector broadcast_nodes. function_node broadcast_nodes. function_node .
  • concurrent_vector multioutput_function_body. _node. multioutput_function_body , broadcast_node try_put . , try_puts broadcast_nodes. try_put. , , , node.
  • broadcast_nodes function_nodes. "" broadcast_nodes multioutput_function_body. , function_node , function_body concurrent_vector broadcast_node. g.wait_for_all().

concurrent_vector, , concurrent_vector. , broadcast_node . , , . concurrent_vectors ; , , , .

+4

Source: https://habr.com/ru/post/1613340/


All Articles