Performing intersection of a C ++ vector

I have 200 vectors ranging in size from 1 to 4,000,000 stored in vecOfVec. I need to cross these vectors with one vecSearched vector of 9000+ elements. I tried to do the same with the following code, but using the primary program, I found that the intersection I am doing is a bottleneck in my code. Is there a way I can perform an efficient intersection

#include <cstdlib> #include <iostream> #include <vector> using namespace std; int main(int argc, char** argv) { vector<vector<unsigned> > vecOfVec; //contains 120 vectors of size ranging from 1 to 2000000 elements. All vectors in vecOfVec are sorted vector<unsigned> vecSearched; //vector searched in contains 9000+ elements. Vectors in vecSearched are sorted for(unsigned kbt=0; kbt<vecOfVec.size(); kbt++) { //first find first 9 values spaced at equi-distant places, use these 9 values for performing comparisons vector<unsigned> equiSpacedVec; if(((vecSearched[0]))>vecOfVec[kbt][(vecOfVec[kbt].size())-1]) //if beginning of searched vector > last value present in individual vectors of vecOfVec then continue { continue; } unsigned elementIndex=0; //used for iterating over equiSpacedVec unsigned i=0; //used for iterating over individual buckets vecOfVec[kbt].second //search for value in bucket and store it in bucketValPos bool firstRun=true; for(vector<unsigned>::iterator itValPos=vecSearched.begin();itValPos!=vecSearched.end();++itValPos) { //construct a summarized vector out of individual vectors of vecOfVec if(firstRun) { firstRun=false; unsigned elementIndex1=0; //used for iterating over equiSpacedVec while(elementIndex1<(vecOfVec[kbt].size())) //create a small vector for skipping over the remaining vectors { if((elementIndex1+(10000))<(vecOfVec[kbt].size())) elementIndex1+=10000; else break; equiSpacedVec.push_back(vecOfVec[kbt][elementIndex1]); } } //skip individual vectors of vecOfVec using summarized vector constructed above while((!(equiSpacedVec.empty()))&&(equiSpacedVec.size()>(elementIndex+1))&&((*itValPos)>equiSpacedVec[elementIndex+1])){ elementIndex+=1; if((i+100)<(vecOfVec[kbt].size())) i+=100; } unsigned j=i; while(((*itValPos)>vecOfVec[kbt][j])&&(j<vecOfVec[kbt].size())){ j++; } if(j>(vecOfVec[kbt].size()-1)) //element not found even at last position. { break; } if((*itValPos)==vecOfVec[kbt][j]) { //store intersection result } } } return 0; } 
+6
source share
4 answers

Your problem is very popular. Since you don't have data matching intersecting vectors, it comes down to speeding up the intersection between two vectors , and it basically has two approaches:

1. No pretreatment

This is usually described by three things:

  • Return the number of comparisons. For example, for small vectors (size from 1 to 50) you should binary search for each element to avoid traversing all 9000+ elements of the object's vector.

  • Improving the quality of the code to reduces false predictions from the industry , for example, by observing that the result set will usually be smaller than the input sets can convert this code:

     while (Apos < Aend && Bpos < Bend) { if (A[Apos] == B[Bpos]) { C[Cpos++] = A[Apos]; Apos++; Bpos++; } else if (A[Apos] > B[Bpos]) { Bpos++; } else { Apos++; } } 

    for code that "expands" such comparisons, creating, although it is easier to predict branches (example for block size = 2):

     while (1) { Adat0 = A[Apos]; Adat1 = A[Apos + 1]; Bdat0 = B[Bpos]; Bdat1 = B[Bpos + 1]; if (Adat0 == Bdat0) { C[Cpos++] = Adat0; } else if (Adat0 == Bdat1) { C[Cpos++] = Adat0; goto advanceB; } else if (Adat1 == Bdat0) { C[Cpos++] = Adat1; goto advanceA; } if (Adat1 == Bdat1) { C[Cpos++] = Adat1; goto advanceAB; } else if (Adat1 > Bdat1) goto advanceB; else goto advanceA; advanceA: Apos+=2; if (Apos >= Aend) { break; } else { continue; } advanceB: Bpos+=2; if (Bpos >= Bend) { break; } else { continue; } advanceAB: Apos+=2; Bpos+=2; if (Apos >= Aend || Bpos >= Bend) { break; } } // fall back to naive algorithm for remaining elements 
  • Using SIMD Commands to Perform Block Operations

These methods are difficult to describe in the context of QA, but you can read about them (plus related optimizations such as if conversion ) here and here or find implementation elements here

2. During pretreatment

This IMHO is the best way for your case, because you have a separate object of elements of size 9000+. You can make an interval tree from it or just find a way to index it, for example, create a structure that will speed up the search on it:

 vector<unsigned> subject(9000+); vector<range> index(9000/M); 

where range is a structure like

 struct range { unsigned min, max; }; 

thus creating a sequence of ranges

 [0, 100], [101, 857], ... [33221, 33500] 

which allows you to skip many comparisons when performing the intersection (for example, if an element of another set is greater than the maximum value of the subband, you can completely skip this subband)

3. Parallelization

Yes, there is always the third element in a list of two: P. When you have sufficiently optimized the procedure (and only then), break your work into pieces and run it in parallel. The problem is consistent with the embarrassment pattern , so 200 vectors versus 1 should definitely run as โ€œ50 versus 1 four times at a timeโ€

Test, measurement, redesign!

+6
source

If I understand your code correctly, then if N is the number of vectors and M is the number of elements inside each vector, then your algorithm is approximately O (N * M ^ 2). Then there is the "bucket" strategy, which improves the situation a little, but its effect is difficult to evaluate at first glance.

I suggest you work with sorted vectors and create intersections on sorted vectors. Something like that:

 vector<vector<unsigned> > vecOfVec; vector<unsignend> vecSearched ; for (vector<unsigned> v : vecSearched) // yes, a copy { std::sort(v.begin(), v.end()) ; if (vecSearched.empty()) // first run detection vSearched = v ; else { // compute intersection of v and vecSearch auto vit = v.begin() ; auto vend = v.end() ; auto sit = vecSearched.begin() ; auto send = vecSearched.end() ; vector<unsiged> result ; while (vit != vend && sit != send) { if (*vit < *sit) vit++ ; else if (*vit == *sit) { result.push_bck(*it) ; ++vit ; ++sit ; } else // *vit > *sit ++sit ; } vecSearched = result ; } } 

The code is not verified, in any case, the idea is that intersection on sorted vectors is easier, since you can compare these two iterators (wit, sit) and grow one that points to a smaller one. Thus, the intersection is linear in M, and the whole complexity is O (N * M * log (M)), where log (M) is due to sorting

+4
source

The easiest way to compare two sorted vectors is to iterate both of them at the same time and only increase, no matter which iterator matters. To do this, a minimum number of comparisons is guaranteed if both vectors are unique. In fact, you can use the same code for all kinds of linked link lists.

@Nikos Athanasiou (answer above) gives many useful tips for speeding up your code, for example using skip lists, simd comparisons. However, your data set is so small that even simple, naive code here is dazzlingly fast ...

 template<typename CONT1, typename CONT2, typename OP_MARKIDENTICAL> inline void set_mark_overlap( const CONT1& cont1, const CONT2& cont2, OP_MARKIDENTICAL op_markidentical) { auto ii = cont1.cbegin(); auto end1 = cont1.cend(); auto jj = cont2.cbegin(); auto end2 = cont2.cend(); if (cont1.empty() || cont2.empty()) return; for (;;) { // increment iterator to container 1 if it is less if (*ii < *jj) { if (++ii == end1) break; } // increment iterator to container 2 if it is less else if (*jj < *ii) { if (++jj == end2) break; } // same values // increment both iterators else { op_markidentical(*ii); ++ii; if (ii == end1) break; // // Comment if container1 can contain duplicates // ++jj; if (jj == end2) break; } } } 

Here is how you can use this code:

 template<typename TT> struct op_store { vector<TT>& store; op_store(vector<TT>& store): store(store){} void operator()(TT val){store.push_back(val);} }; vector<unsigned> first{1,2,3,4,5,6}; vector<unsigned> second{1,2,5,6, 7,9}; vector<unsigned> overlap; set_mark_overlap( first, second, op_store<unsigned>(overlap)); for (const auto& ii : overlap) std::cout << ii << ","; std::cout << ii << "\n"; // 1๏ผŒ2๏ผŒ5๏ผŒ6 

This code assumes that no vector contains duplicates. If any of your vecOfVec contains duplicates, and you want each duplicate to be printed, you need to comment on the code above. If your vecSearched vector contains duplicates, it is not clear what the corresponding answer will be ...

In your case, the code for storing matching values โ€‹โ€‹will be only these three lines:

 // results vector<vector<unsigned> > results(120); for (unsigned ii = 0; ii < vecOfVec.size(); ++ii) set_mark_overlap(vecSearched, vecOfVec[ii], op_store<unsigned>(results[ii])); 

In terms of optimization, your problem has two characteristics: 1) One list is always much shorter than the other 2) A shorter list is reused, while a longer list is new for each comparison.

Front costs (preprocessing, for example, for skip lists, as suggested by @Nikos Athanasiou) are not related to the red list of 9000 (which is used again and again), but not to the longer list.

I assume that most of the skips are for longer lists, so skip lists may not be a panacea. How about making some kind of dynamic skip list so that you jump on N (j + = 4,000,000 / 9000) one (++ j) when the container catches up with the second (in the code above). If you jumped by two, you can use the mini-binary search to find the right amount to increase j.

Because of this asymmetry in the lengths of the lists, I donโ€™t see the conversion from SIMD to help: we need to minimize the number of comparisons less (N + M), and not increase the speed of each comparison. However, it depends on your data. Code up and time for things!

Here is the test code for creating some random number vectors and check that they are present

 #include <iostream> #include <vector> #include <unordered_set> #include <exception> #include <algorithm> #include <limits> #include <random> using namespace std; template<typename TT> struct op_store { std::vector<TT>& store; op_store(vector<TT>& store): store(store){} void operator()(TT val, TT val2){if (val != val2) std::cerr << val << " !- " << val2 << "\n"; store.push_back(val);} }; void fill_vec_with_unique_random_values(vector<unsigned>& cont, unordered_set<unsigned>& curr_values) { static random_device rd; static mt19937 e1(rd()); static uniform_int_distribution<unsigned> uniform_dist(0, std::numeric_limits<unsigned>::max()); for (auto& jj : cont) { for (;;) { unsigned new_value = uniform_dist(e1); // make sure all values are unique if (curr_values.count(new_value) == 0) { curr_values.insert(new_value); jj = new_value; break; } } } } int main (int argc, char *argv[]) { static random_device rd; static mt19937 e1(rd()); // vector searched in contains 9000+ elements. Vectors in vecSearched are sorted vector<unsigned> vecSearched(9000); unordered_set<unsigned> unique_values_9000; fill_vec_with_unique_random_values(vecSearched, unique_values_9000); // // Create 120 vectors of size ranging from 1 to 2000000 elements. All vectors in vecOfVec are sorted // vector<vector<unsigned> > vecOfVec(5); normal_distribution<> vec_size_normal_dist(1000000U, 500000U); for (unsigned ii = 0; ii < vecOfVec.size(); ++ii) { std::cerr << " Create Random data set" << ii << " ...\n"; auto vec_size = min(2000000U, static_cast<unsigned>(vec_size_normal_dist(e1))); vecOfVec[ii].resize(vec_size); // Do NOT share values with the 9000. We will manually add these later unordered_set<unsigned> unique_values(unique_values_9000); fill_vec_with_unique_random_values(vecOfVec[ii], unique_values); } // insert half of vecSearched in our 120 vectors so that we know what we are going to find vector<unsigned> correct_results(begin(vecSearched), begin(vecSearched) + 4500); for (unsigned ii = 0; ii < vecOfVec.size(); ++ii) vecOfVec[ii].insert(vecOfVec[ii].end(), begin(correct_results), end(correct_results)); // Make sure everything is sorted std::cerr << " Sort data ...\n"; for (unsigned ii = 0; ii < vecOfVec.size(); ++ii) sort(begin(vecOfVec[ii]), end(vecOfVec[ii])); sort(begin(vecSearched), end(vecSearched)); sort(begin(correct_results), end(correct_results)); std::cerr << " Match ...\n"; // results vector<vector<unsigned> > results(120); for (unsigned ii = 0; ii < vecOfVec.size(); ++ii) { std::cerr << ii << " done\n"; set_mark_overlap(vecSearched, vecOfVec[ii], op_store<unsigned>(results[ii])); // check all is well if (results[ii] != correct_results) throw runtime_error("Oops"); } return(0); } 
+2
source

using set_intersection may help, but I don't know if it improves overall speed:

 vector<vector<unsigned int> > vecOfVec(200); vector<unsigned int> vecSearched; set<unsigned int> intersection; for(auto it = vecOfVec.begin(); it != vecOfVec.end(); ++it) { std::set_intersection(it->begin(), it->end(), vecSearched.begin(), vecSearched.end(), std::inserter(intersection, intersection.begin())); } 
+1
source

Source: https://habr.com/ru/post/989782/


All Articles