Continuous multiple producer / user queue in C ++ 11

I am trying to implement multi-producer locking, multiple consumer queues in C ++ 11. I am doing this as a training exercise, so I know well that I can just use an existing open source implementation, but I would really like to know why my code isn’t works. The data is stored in a ring buffer, apparently this is a "limited MPMC queue."

I pretty well modeled what I read about Disruptor. I noticed that it works absolutely normal with one consumer and one / several manufacturers, these are just a few consumers who seem to violate it.

Here's the queue:

template <typename T> class Queue : public IQueue<T> { public: explicit Queue( int capacity ); ~Queue(); bool try_push( T value ); bool try_pop( T& value ); private: typedef struct { bool readable; T value; } Item; std::atomic<int> m_head; std::atomic<int> m_tail; int m_capacity; Item* m_items; }; template <typename T> Queue<T>::Queue( int capacity ) : m_head( 0 ), m_tail( 0 ), m_capacity(capacity), m_items( new Item[capacity] ) { for( int i = 0; i < capacity; ++i ) { m_items[i].readable = false; } } template <typename T> Queue<T>::~Queue() { delete[] m_items; } template <typename T> bool Queue<T>::try_push( T value ) { while( true ) { // See that there room int tail = m_tail.load(std::memory_order_acquire); int new_tail = ( tail + 1 ); int head = m_head.load(std::memory_order_acquire); if( ( new_tail - head ) >= m_capacity ) { return false; } if( m_tail.compare_exchange_weak( tail, new_tail, std::memory_order_acq_rel ) ) { // In try_pop, m_head is incremented before the reading of the value has completed, // so though we've acquired this slot, a consumer thread may be in the middle of reading tail %= m_capacity; std::atomic_thread_fence( std::memory_order_acquire ); while( m_items[tail].readable ) { } m_items[tail].value = value; std::atomic_thread_fence( std::memory_order_release ); m_items[tail].readable = true; return true; } } } template <typename T> bool Queue<T>::try_pop( T& value ) { while( true ) { int head = m_head.load(std::memory_order_acquire); int tail = m_tail.load(std::memory_order_acquire); if( head == tail ) { return false; } int new_head = ( head + 1 ); if( m_head.compare_exchange_weak( head, new_head, std::memory_order_acq_rel ) ) { head %= m_capacity; std::atomic_thread_fence( std::memory_order_acquire ); while( !m_items[head].readable ) { } value = m_items[head].value; std::atomic_thread_fence( std::memory_order_release ); m_items[head].readable = false; return true; } } } 

And here is the test I am using:

 void Test( std::string name, Queue<int>& queue ) { const int NUM_PRODUCERS = 64; const int NUM_CONSUMERS = 2; const int NUM_ITERATIONS = 512; bool table[NUM_PRODUCERS*NUM_ITERATIONS]; memset(table, 0, NUM_PRODUCERS*NUM_ITERATIONS*sizeof(bool)); std::vector<std::thread> threads(NUM_PRODUCERS+NUM_CONSUMERS); std::chrono::system_clock::time_point start, end; start = std::chrono::system_clock::now(); std::atomic<int> pop_count (NUM_PRODUCERS * NUM_ITERATIONS); std::atomic<int> push_count (0); for( int thread_id = 0; thread_id < NUM_PRODUCERS; ++thread_id ) { threads[thread_id] = std::thread([&queue,thread_id,&push_count]() { int base = thread_id * NUM_ITERATIONS; for( int i = 0; i < NUM_ITERATIONS; ++i ) { while( !queue.try_push( base + i ) ){}; push_count.fetch_add(1); } }); } for( int thread_id = 0; thread_id < ( NUM_CONSUMERS ); ++thread_id ) { threads[thread_id+NUM_PRODUCERS] = std::thread([&]() { int v; while( pop_count.load() > 0 ) { if( queue.try_pop( v ) ) { if( table[v] ) { std::cout << v << " already set" << std::endl; } table[v] = true; pop_count.fetch_sub(1); } } }); } for( int i = 0; i < ( NUM_PRODUCERS + NUM_CONSUMERS ); ++i ) { threads[i].join(); } end = std::chrono::system_clock::now(); std::chrono::duration<double> duration = end - start; std::cout << name << " " << duration.count() << std::endl; std::atomic_thread_fence( std::memory_order_acq_rel ); bool result = true; for( int i = 0; i < NUM_PRODUCERS * NUM_ITERATIONS; ++i ) { if( !table[i] ) { std::cout << "failed at " << i << std::endl; result = false; } } std::cout << name << " " << ( result? "success" : "fail" ) << std::endl; } 

Thanks so much for pushing in the right direction. I'm new to memory fences, instead of just using the mutex for everything, so I probably just underestimate something fundamentally.

Cheers J

+5
source share
2 answers

I would look at Moody Camel .

This is a quick, general, unblocked queue for C ++, written entirely in C ++ 11. The documentation looks pretty good, as well as several performance tests.

Among all other interesting things (they are worth reading anyway), all this is contained in one heading and is available under a simplified BSD license. Just drop it into your project and enjoy!

+3
source
 #pragma once #include <atomic> #include <vector> /// <summary>Lock Free MPMC Queue - to solve Producer(1..n)/Consumer(1..n) problem.</summary> template <typename T> class QueueMpmcLockFree { private: // Valid data range. std::atomic<int> m_atomicIndexBack = -1; // Last element. std::atomic<int> m_atomicIndexBackPushing = -1; // Last element being pushed. std::atomic<int> m_atomicIndexFront = 0; // Current element to be popped. May not be ready yet. // Queue in a cyclic buffer. const int m_sizeLimit; std::vector<T> m_queue; public: /// <summary>Ctor. Initialize the queue with specified size.</summary> explicit QueueMpmcLockFree(const int sizeLimit = 10) : m_sizeLimit(sizeLimit) , m_queue(m_sizeLimit) // Fill the queue with default value. { } /// <summary>Returns the number of messages.</summary> int Peek() { return m_atomicIndexBack - m_atomicIndexFront + 1; } /// <summary>Push an item to the back of the queue. Move. /// Attempts to put a message into the queue. Return true if the message is sent successfully. /// If the message could not be sent immediately, typically because /// the queue is full, false is returned. /// </summary> bool TryPush(T && itemToAddByMoving) { if (Peek() == m_sizeLimit) return false; const int indexNext = ++m_atomicIndexBackPushing; // Get next position to add item. m_queue[indexNext % m_sizeLimit] = std::move(itemToAddByMoving); // Moving item takes time. ++m_atomicIndexBack; // Item pushed and ready. return true; } /// <summary>Return true if a message is popped.</summary> bool TryPop(T & poppedMsg) { if (Peek() <= 0) return false; const int indexPop = m_atomicIndexFront++; // Get current position and set next one. poppedMsg = m_queue[indexPop % m_sizeLimit]; // Copy message. return true; } }; 
-3
source

Source: https://habr.com/ru/post/1202006/


All Articles