An idiomatic way to parallelize a function line by line in C ++

I often write C ++ form code:

while (getline(strm, line)) { cout << computationally_intensive_function(line) << endl; } 

I would like to parallelize this code. The best solution I have come up with so far is to build a row vector to accommodate a large number of rows (10000-100000), and then parallelize this vector with

 #pragma omp parallel for 

Then release the vector and repeat until the lines remain. However, this method requires a lot of memory, and the rest of the cores are inactive, while the main process is line buffering. Is there a better way? Something like Python multiprocessing.Pool.map or Hadoop? (I would like, however, to avoid using the Hadoop C ++ API, because Hadoop is quite heavy and cannot be installed wherever my code will work.)

+6
source share
2 answers

There is a not very well-known feature of OpenMP 3.0 tasks , which is very unsuccessful, since they were specially created to cover such cases. If your compiler supports this standard version, you should definitely go for OpenMP tasks. But keep in mind that writing to stdout (or std::cout ) from multiple threads usually mixes their output poorly, and you most likely want to synchronize them:

 #pragma omp parallel { #pragma omp master while (getline(strm, line)) #pragma omp task { result_type result = computationally_intensive_function(line); #pragma omp critical { cout << result << endl; cout.flush(); } } #pragma omp taskwait } 

I leave this for you to decide which variables should be shared and what should be private .

+5
source

You must overlay your calculations on the lines of reading from the file. One good way to do this is to use the Threading Building Blocks pipeline assembly algorithm. What you do is specify three (based on what you show in the pseudo-code example), two consecutive and one parallel. Serial filters are input and output. The first one reads the data from the file line by line and passes each line to the second filter, which is parallel, and launches your calculation / processing function in multi-threaded mode. The last stage / filter is also serial, and it concludes. I will copy the example from the TBB Tutorial, which seems to do exactly what you want to achieve:

 // Holds a slice of text. /** Instances *must* be allocated/freed using methods herein, because the C++ declaration represents only the header of a much larger object in memory. */ class TextSlice { // Pointer to one past last character in sequence char* logical_end; // Pointer to one past last available byte in sequence. char* physical_end; public: // Allocate a TextSlice object that can hold up to max_size characters. static TextSlice* allocate( size_t max_size ) { // +1 leaves room for a terminating null character. TextSlice* t = (TextSlice*)tbb::tbb_allocator<char>().allocate(sizeof(TextSlice)+max_size+1 ); t->logical_end = t->begin(); t->physical_end = t->begin()+max_size; return t; } // Free this TextSlice object void free() { tbb::tbb_allocator<char>().deallocate((char*)this, sizeof(TextSlice)+(physical_end-begin())+1); } // Pointer to beginning of sequence char* begin() {return (char*)(this+1);} // Pointer to one past last character in sequence char* end() {return logical_end;} // Length of sequence size_t size() const {return logical_end-(char*)(this+1);} // Maximum number of characters that can be appended to sequence size_t avail() const {return physical_end-logical_end;} // Append sequence [first,last) to this sequence. void append( char* first, char* last ) { memcpy( logical_end, first, last-first ); logical_end += last-first; } // Set end() to given value. void set_end( char* p ) {logical_end=p;} }; 

And to run this function:

 void RunPipeline( int ntoken, FILE* input_file, FILE* output_file ) { tbb::parallel_pipeline( ntoken, tbb::make_filter<void,TextSlice*>( tbb::filter::serial_in_order, MyInputFunc(input_file) ) & tbb::make_filter<TextSlice*,TextSlice*>( tbb::filter::parallel, MyTransformFunc() ) & tbb::make_filter<TextSlice*,void>( tbb::filter::serial_in_order, MyOutputFunc(output_file) ) ); } 
+1
source

Source: https://habr.com/ru/post/916219/


All Articles