You must overlay your calculations on the lines of reading from the file. One good way to do this is to use the Threading Building Blocks pipeline assembly algorithm. What you do is specify three (based on what you show in the pseudo-code example), two consecutive and one parallel. Serial filters are input and output. The first one reads the data from the file line by line and passes each line to the second filter, which is parallel, and launches your calculation / processing function in multi-threaded mode. The last stage / filter is also serial, and it concludes. I will copy the example from the TBB Tutorial, which seems to do exactly what you want to achieve:
// Holds a slice of text. /** Instances *must* be allocated/freed using methods herein, because the C++ declaration represents only the header of a much larger object in memory. */ class TextSlice { // Pointer to one past last character in sequence char* logical_end; // Pointer to one past last available byte in sequence. char* physical_end; public: // Allocate a TextSlice object that can hold up to max_size characters. static TextSlice* allocate( size_t max_size ) { // +1 leaves room for a terminating null character. TextSlice* t = (TextSlice*)tbb::tbb_allocator<char>().allocate(sizeof(TextSlice)+max_size+1 ); t->logical_end = t->begin(); t->physical_end = t->begin()+max_size; return t; } // Free this TextSlice object void free() { tbb::tbb_allocator<char>().deallocate((char*)this, sizeof(TextSlice)+(physical_end-begin())+1); } // Pointer to beginning of sequence char* begin() {return (char*)(this+1);} // Pointer to one past last character in sequence char* end() {return logical_end;} // Length of sequence size_t size() const {return logical_end-(char*)(this+1);} // Maximum number of characters that can be appended to sequence size_t avail() const {return physical_end-logical_end;} // Append sequence [first,last) to this sequence. void append( char* first, char* last ) { memcpy( logical_end, first, last-first ); logical_end += last-first; } // Set end() to given value. void set_end( char* p ) {logical_end=p;} };
And to run this function:
void RunPipeline( int ntoken, FILE* input_file, FILE* output_file ) { tbb::parallel_pipeline( ntoken, tbb::make_filter<void,TextSlice*>( tbb::filter::serial_in_order, MyInputFunc(input_file) ) & tbb::make_filter<TextSlice*,TextSlice*>( tbb::filter::parallel, MyTransformFunc() ) & tbb::make_filter<TextSlice*,void>( tbb::filter::serial_in_order, MyOutputFunc(output_file) ) ); }
source share