Lock-free queue with boost :: atomic. Am I doing it right?

Short version:

I am trying to replace std :: atomic from C ++ 11 used in an implementation without blocking, one manufacturer, one user queue from here . How to replace this with boost::atomic ?

Long version:

I am trying to get the best performance from our workflow application. Each thread has its own task queue. We must synchronize the use of locks before we perform the de-tool / enqueue of each task.

Then I found an article by Herb Sutter about free queuing. It seems to be the perfect replacement. But the code uses std::atomic from C ++ 11, which I could not present to the project at this time.

More googling led to some examples, such as this one for Linux (echelon's) and this one for Windows (TINESWARE) . Both use platform-specific designs, such as WinAPI InterlockedExchangePointer and GCC __sync_lock_test_and_set .

I only need to support Windows and Linux, so maybe I can succeed with some #ifdef s. But I thought it would be better to use what boost::atomic offers. Boost Atomic is not yet part of the official Boost library. So I downloaded the source from http://www.chaoticmind.net/~hcb/projects/boost.atomic/ and used the include files with my project.

This is what I get so far:

 #pragma once #include <boost/atomic.hpp> template <typename T> class LockFreeQueue { private: struct Node { Node(T val) : value(val), next(NULL) { } T value; Node* next; }; Node* first; // for producer only boost::atomic<Node*> divider; // shared boost::atomic<Node*> last; // shared public: LockFreeQueue() { first = new Node(T()); divider = first; last= first; } ~LockFreeQueue() { while(first != NULL) // release the list { Node* tmp = first; first = tmp->next; delete tmp; } } void Produce(const T& t) { last.load()->next = new Node(t); // add the new item last = last.load()->next; while(first != divider) // trim unused nodes { Node* tmp = first; first = first->next; delete tmp; } } bool Consume(T& result) { if(divider != last) // if queue is nonempty { result = divider.load()->next->value; // C: copy it back divider = divider.load()->next; return true; // and report success } return false; // else report empty } }; 

Some changes:

 boost::atomic<Node*> divider; // shared boost::atomic<Node*> last; // shared 

and

  last.load()->next = new Node(t); // add the new item last = last.load()->next; 

and

  result = divider.load()->next->value; // C: copy it back divider = divider.load()->next; 

Am I using load () (and implicit storage ()) from boost :: atomic right here? Can we say that this is equivalent to the original C-11 Sutter open queue?

PS. I studied a lot of threads on SO, but none of them can serve as an example for boost :: atomic and free-queue queue.

+4
source share
2 answers

Have you tried Intel Thread Building Blocks' atomic<T> ? Cross platform and free.

Also...

A single producer / single consumer makes your problem a lot easier because your linearization point may be the only operator. It becomes even easier if you are ready to accept a limited queue.

A limited queue offers benefits for cache performance, since you can reserve a block of memory with a aligned cache to maximize your hits, for example:

 #include <vector> #include "tbb/atomic.h" #include "tbb/cache_aligned_allocator.h" template< typename T > class SingleProdcuerSingleConsumerBoundedQueue { typedef vector<T, cache_aligned_allocator<T> > queue_type; public: BoundedQueue(int capacity): queue(queue_type()) { head = 0; tail = 0; queue.reserve(capacity); } size_t capacity() { return queue.capacity(); } bool try_pop(T& result) { if(tail - head == 0) return false; else { result = queue[head % queue.capacity()]; head.fetch_and_increment(); //linearization point return(true); } } bool try_push(const T& source) { if(tail - head == queue.capacity()) return(false); else { queue[tail % queue.capacity()] = source; tail.fetch_and_increment(); //linearization point return(true); } } ~BoundedQueue() {} private: queue_type queue; atomic<int> head; atomic<int> tail; }; 
+1
source

Check out the boost.atomic ringbuffer example from the documentation:

 #include <boost/atomic.hpp> template <typename T, size_t Size> class ringbuffer { public: ringbuffer() : head_(0), tail_(0) {} bool push(const T & value) { size_t head = head_.load(boost::memory_order_relaxed); size_t next_head = next(head); if (next_head == tail_.load(boost::memory_order_acquire)) return false; ring_[head] = value; head_.store(next_head, boost::memory_order_release); return true; } bool pop(T & value) { size_t tail = tail_.load(boost::memory_order_relaxed); if (tail == head_.load(boost::memory_order_acquire)) return false; value = ring_[tail]; tail_.store(next(tail), boost::memory_order_release); return true; } private: size_t next(size_t current) { return (current + 1) % Size; } T ring_[Size]; boost::atomic<size_t> head_, tail_; }; // How to use int main() { ringbuffer<int, 32> r; // try to insert an element if (r.push(42)) { /* succeeded */ } else { /* buffer full */ } // try to retrieve an element int value; if (r.pop(value)) { /* succeeded */ } else { /* buffer empty */ } } 

The only limitation of the code is that the length of the buffer must be known at compile time (or at build time, if you replace the array with std::vector<T> ). As far as I understand, to allow the growth and contraction of the buffer is not trivial.

0
source

Source: https://habr.com/ru/post/1398441/


All Articles