Queue lock state?

I am trying to implement a high-performance lock queue supported by a circular buffer on top of pthreads, semaphore.h and gcc atomic builtins. The queue should process several simulated readers and writers from different threads.

I highlighted some kind of race condition, and I'm not sure if this is an erroneous assumption about the behavior of some atomic operations and semaphores, or my design is fundamentally wrong.

I extracted and simplified it to a separate example. I would expect this program to never return. However, it returns after several hundred thousand iterations with the detection of corruption in the queue.

In the example below (for presentation), it actually does not store anything, it simply sets 0 to represent the empty cell in the cell in which the actual data will be stored. There is a counting semaphore (vacancy) representing the number of vacant cells, and another counting semaphore (tenants) representing the number of occupied cells.

Writers do the following:

  • job cuts
  • atomically get the next main index (mod queue size)
  • write him
  • passenger growth

Readers do the opposite:

  • reduced passengers
  • atomically get the next tail index (mod queue size)
  • read it
  • additional vacancies

I would expect that, given the above, exactly one stream can read or write any given cell at a time.

Any ideas on why this is not working or debugging strategies are appreciated. Code and output below ...

#include <stdlib.h> #include <semaphore.h> #include <iostream> using namespace std; #define QUEUE_CAPACITY 8 // must be power of 2 #define NUM_THREADS 2 struct CountingSemaphore { sem_t m; CountingSemaphore(unsigned int initial) { sem_init(&m, 0, initial); } void post() { sem_post(&m); } void wait() { sem_wait(&m); } ~CountingSemaphore() { sem_destroy(&m); } }; struct BlockingQueue { unsigned int head; // (head % capacity) is next head position unsigned int tail; // (tail % capacity) is next tail position CountingSemaphore vacancies; // how many cells are vacant CountingSemaphore occupants; // how many cells are occupied int cell[QUEUE_CAPACITY]; // (cell[x] == 1) means occupied // (cell[x] == 0) means vacant BlockingQueue() : head(0), tail(0), vacancies(QUEUE_CAPACITY), occupants(0) { for (size_t i = 0; i < QUEUE_CAPACITY; i++) cell[i] = 0; } // put an item in the queue void put() { vacancies.wait(); // atomic post increment set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY); occupants.post(); } // take an item from the queue void take() { occupants.wait(); // atomic post increment get(__sync_fetch_and_add(&tail, 1) % QUEUE_CAPACITY); vacancies.post(); } // set cell i void set(unsigned int i) { // atomic compare and assign if (!__sync_bool_compare_and_swap(&cell[i], 0, 1)) { corrupt("set", i); exit(-1); } } // get cell i void get(unsigned int i) { // atomic compare and assign if (!__sync_bool_compare_and_swap(&cell[i], 1, 0)) { corrupt("get", i); exit(-1); } } // corruption detected void corrupt(const char* action, unsigned int i) { static CountingSemaphore sem(1); sem.wait(); cerr << "corruption detected" << endl; cerr << "action = " << action << endl; cerr << "i = " << i << endl; cerr << "head = " << head << endl; cerr << "tail = " << tail << endl; for (unsigned int j = 0; j < QUEUE_CAPACITY; j++) cerr << "cell[" << j << "] = " << cell[j] << endl; } }; BlockingQueue q; // keep posting to the queue forever void* Source(void*) { while (true) q.put(); return 0; } // keep taking from the queue forever void* Sink(void*) { while (true) q.take(); return 0; } int main() { pthread_t id; // start some pthreads to run Source function for (int i = 0; i < NUM_THREADS; i++) if (pthread_create(&id, NULL, &Source, 0)) abort(); // start some pthreads to run Sink function for (int i = 0; i < NUM_THREADS; i++) if (pthread_create(&id, NULL, &Sink, 0)) abort(); while (true); } 

Compile the above:

  $ g++ -pthread AboveCode.cpp $ ./a.out 

The output is different each time, but here is one example:

  corruption detected action = get i = 6 head = 122685 tail = 122685 cell[0] = 0 cell[1] = 0 cell[2] = 1 cell[3] = 0 cell[4] = 1 cell[5] = 0 cell[6] = 1 cell[7] = 1 

My system is Ubuntu 11.10 on Intel Core 2:

  $ uname -a Linux 3.0.0-14-generic #23-Ubuntu SMP \ Mon Nov 21 20:28:43 UTC 2011 x86_64 x86_64 x86_64 GNU/Linux $ cat /proc/cpuinfo | grep Intel model name : Intel(R) Core(TM)2 Quad CPU Q9300 @ 2.50GHz $ g++ --version g++ (Ubuntu/Linaro 4.6.1-9ubuntu3) 4.6.1 

Thanks Andrew.

+4
source share
3 answers

One of the possible situations, traced step by step for two streams (W0, W1) and one read stream (R0). W0 introduced by put () before W1 was interrupted by the operating system or hardware and ended later.

  w0 (core 0) w1 (core 1) r0 t0 ---- --- blocked on occupants.wait() / take t1 entered put() --- --- t2 vacancies.wait() entered put() --- t3 got new_head = 1 vacancies.wait() --- t4 <interrupted by OS> got new_head = 2 --- t5 written 1 at cell[2] --- t6 occupants.post(); --- t7 exited put() waked up t8 --- got new_tail = 1 t9 <still in interrupt> --- read 0 from ceil[1] !! corruption !! t10 written 1 at cell[1] t11 occupants.post(); t12 exited put() 
+4
source

From a design point of view, I would consider the entire queue as a shared resource and protect it with a single mutex.

Writers do the following:

  • accept mutex
  • write to the queue (including index processing)
  • free mutex

Readers do the following:

  • accept mutex
  • reading from the queue (including index processing)
  • free mutex
+1
source

I have a theory. This is a circular queue, so one read stream can be superimposed. Say the reader gets index 0. Before he does anything, he will lose the processor. Another stream of readers accepts index 1, then 2, then 3 ... then 7, then 0. The first reader wakes up, and both threads believe that they have exclusive access to index 0. Not sure how to prove it. Hope this helps.

0
source

Source: https://habr.com/ru/post/1389402/


All Articles