Shared buffer optimization in a multi-threaded producer / consumer environment

I have a project in which I have one producer stream that writes events to the buffer, and an additional one consumer stream that receives events from the buffer. My goal is to optimize this thing for one dual-core computer to achieve maximum throughput.

Currently, I use a simple ring buffer without blocking (blocking is possible, since I have only one consumer and one producer thread, and therefore pointers are updated with only one thread).

#define BUF_SIZE 32768

struct buf_t { volatile int writepos; volatile void * buffer[BUF_SIZE]; 
    volatile int readpos;) };

void produce (buf_t *b, void * e) {
    int next = (b->writepos+1) % BUF_SIZE;
    while (b->readpos == next); // queue is full. wait
    b->buffer[b->writepos] = e; b->writepos = next;
}

void * consume (buf_t *b) {
    while (b->readpos == b->writepos); // nothing to consume. wait
    int next = (b->readpos+1) % BUF_SIZE;
    void * res = b->buffer[b->readpos]; b->readpos = next;
    return res;
}

buf_t *alloc () {
    buf_t *b = (buf_t *)malloc(sizeof(buf_t));
    b->writepos = 0; b->readpos = 0; return b;
}

. BUF_SIZE . , writepos buffer readpos buffer, , .

400%. , , , ..?

+3
3

. (), . , , , .

, , .

300%.

, , - writepos readpos:

void produce (void * e) {
    unsigned int oldpos = buffer.writepos;
    unsigned int next = (oldpos+1) % BUF_SIZE;
    while (next == buffer.rpos) { // rpos is not volatile
        buffer.rpos = buffer.readpos;
        usleep(1);
    }
    buffer.buffer[oldpos] = e; buffer.writepos = next;
}

().

( , , ).

#define STRIDE 16
#define STEPS 524288

struct buf_t {
    volatile unsigned int writepos;
    int offset [STRIDE - 1];
    unsigned int wpos;
    int offset2 [STRIDE - 1];
    volatile void * buffer[BUF_SIZE];
    int offset4 [STRIDE];
    volatile unsigned int readpos;
    int offset3 [STRIDE - 1];
    unsigned int rpos;
}

300% - , , , .

, , : -)

.

0

, : consume() b->readpos, thread consume() , . volatile, , :

void * consume (buf_t *b) {
    int rp = b->readpos;
    while (rp == b->writepos); // nothing to consume. wait
    int next = (rp + 1) % BUF_SIZE;
    void * res = b->buffer[rp]; b->readpos = next;
    return res;
}

, - ( , b->buffer[n] 15 16 , b->buffer[n+1]). :

#define STRIDE 16
#define STEPS 2048
#define BUF_SIZE (STRIDE * STEPS)

#define TO_INDEX(n) (STRIDE * (((n) + 1) % STEPS) + (((n) + 1) / STEPS))

void produce (buf_t *b, void * e) {
    unsigned wp = b->writepos;
    unsigned next = (wp + 1) % BUF_SIZE;
    while (b->readpos == next); // queue is full. wait
    b->buffer[TO_INDEX(wp)] = e; b->writepos = next;
}

void * consume (buf_t *b) {
    unsigned rp = b->readpos;
    while (rp == b->writepos); // nothing to consume. wait
    unsigned next = (rp + 1) % BUF_SIZE;
    void * res = b->buffer[TO_INDEX(rp)]; b->readpos = next;
    return res;
}

, . ( , , STRIDE STEPS 2, TO_INDEX() , unsigned - ).

+3

, . , . , , , , .

, . - .

, , -, , .

( ), , , , .

, , - ( ) ( , , ) -. . , , . , . , .

, , , , , .

+1
source

Source: https://habr.com/ru/post/1744128/


All Articles