I am trying to implement multi-producer locking, multiple consumer queues in C ++ 11. I am doing this as a training exercise, so I know well that I can just use an existing open source implementation, but I would really like to know why my code isnβt works. The data is stored in a ring buffer, apparently this is a "limited MPMC queue."
I pretty well modeled what I read about Disruptor. I noticed that it works absolutely normal with one consumer and one / several manufacturers, these are just a few consumers who seem to violate it.
Here's the queue:
template <typename T> class Queue : public IQueue<T> { public: explicit Queue( int capacity ); ~Queue(); bool try_push( T value ); bool try_pop( T& value ); private: typedef struct { bool readable; T value; } Item; std::atomic<int> m_head; std::atomic<int> m_tail; int m_capacity; Item* m_items; }; template <typename T> Queue<T>::Queue( int capacity ) : m_head( 0 ), m_tail( 0 ), m_capacity(capacity), m_items( new Item[capacity] ) { for( int i = 0; i < capacity; ++i ) { m_items[i].readable = false; } } template <typename T> Queue<T>::~Queue() { delete[] m_items; } template <typename T> bool Queue<T>::try_push( T value ) { while( true ) {
And here is the test I am using:
void Test( std::string name, Queue<int>& queue ) { const int NUM_PRODUCERS = 64; const int NUM_CONSUMERS = 2; const int NUM_ITERATIONS = 512; bool table[NUM_PRODUCERS*NUM_ITERATIONS]; memset(table, 0, NUM_PRODUCERS*NUM_ITERATIONS*sizeof(bool)); std::vector<std::thread> threads(NUM_PRODUCERS+NUM_CONSUMERS); std::chrono::system_clock::time_point start, end; start = std::chrono::system_clock::now(); std::atomic<int> pop_count (NUM_PRODUCERS * NUM_ITERATIONS); std::atomic<int> push_count (0); for( int thread_id = 0; thread_id < NUM_PRODUCERS; ++thread_id ) { threads[thread_id] = std::thread([&queue,thread_id,&push_count]() { int base = thread_id * NUM_ITERATIONS; for( int i = 0; i < NUM_ITERATIONS; ++i ) { while( !queue.try_push( base + i ) ){}; push_count.fetch_add(1); } }); } for( int thread_id = 0; thread_id < ( NUM_CONSUMERS ); ++thread_id ) { threads[thread_id+NUM_PRODUCERS] = std::thread([&]() { int v; while( pop_count.load() > 0 ) { if( queue.try_pop( v ) ) { if( table[v] ) { std::cout << v << " already set" << std::endl; } table[v] = true; pop_count.fetch_sub(1); } } }); } for( int i = 0; i < ( NUM_PRODUCERS + NUM_CONSUMERS ); ++i ) { threads[i].join(); } end = std::chrono::system_clock::now(); std::chrono::duration<double> duration = end - start; std::cout << name << " " << duration.count() << std::endl; std::atomic_thread_fence( std::memory_order_acq_rel ); bool result = true; for( int i = 0; i < NUM_PRODUCERS * NUM_ITERATIONS; ++i ) { if( !table[i] ) { std::cout << "failed at " << i << std::endl; result = false; } } std::cout << name << " " << ( result? "success" : "fail" ) << std::endl; }
Thanks so much for pushing in the right direction. I'm new to memory fences, instead of just using the mutex for everything, so I probably just underestimate something fundamentally.
Cheers J
source share