Ring buffer with numpy / ctypes

I am developing a client that will receive [EEG] data on tcp and write it to the ring buffer. I thought it was very convenient to have a buffer as an array of ctypes or numpy, because it can create a โ€œnumpy viewโ€ for any location of such a buffer and read / write / process data without any copy operations. Or is this a bad idea at all?

However, I do not see how to implement a fixed-size circular buffer this way. Suppose I created a buffer object that is contiguous in memory. What is the best way to write data when reaching the end of the buffer?

One possible way is to overwrite the (already old) bytes from the beginning when the write pointer reaches the end of the buffer array. However, near the borders, the numpy view of some fragment (for processing) cannot be created (or maybe?) In this case, because some of them can still be located at the end of the buffer array, and the other at the very beginning. I read that it is impossible to create such circular slices. How to solve this?

UPD: Thanks to everyone for the answers. In case someone faces the same problem, here is the final code that I have.

+6
source share
4 answers

If you need a window of N bytes, make your buffer 2 * N bytes and write all the input data in two locations: i % N and i % N + N , where i is the byte counter. So you always have N consecutive bytes in the buffer.

 data = 'Data to buffer' N = 4 buf = 2*N*['\00'] for i,c in enumerate(data): j = i % N buf[j] = c buf[j+N] = c if i >= N-1: print ''.join(buf[j+1:j+N+1]) 

prints

 Data ata ta t a to to to b o bu buf buff uffe ffer 
+4
source

One possible way is to overwrite the (already old) bytes from the beginning when the write pointer reaches the end of the buffer array.

This is the only option in a fixed-size ring buffer.

I read that it is impossible to create such circular fragments.

That is why I would not do it with a Numpy look. Instead, you can create a class wrapper around ndarray by holding the buffer / array, capacity, and pointer (index) to the insertion point. If you want to get the contents as a Numpy array, you need to make a copy as follows:

 buf = np.array([1,2,3,4]) indices = [3,0,1,2] contents = buf[indices] # copy 

You can still set the values โ€‹โ€‹of the elements in place if you implement __setitem__ and __setslice__ .

+2
source

I think you need to take a step back from the mental style of C. Updating the ringbuffer for each individual insert will never be effective. The ring buffer is fundamentally different from the adjacent memory block interface, which requires numpy arrays; including the fft you are talking about want to do.

The natural solution is to sacrifice some memory for performance. For example, if the number of elements to keep in your buffer is N, select an array of N + 1024 (or some reasonable number). Then you only need to move N elements around every 1024 inserts, and you should always have a continuous view of N elements for direct access.

EDIT: here is a piece of code that implements the above, and should give good performance. Please note, however, that you will be advised to add to pieces, not per item. Otherwise, the benefits of using numpy are quickly nullified, regardless of how you implement your ringbuffer.

 import numpy as np class RingBuffer(object): def __init__(self, size, padding=None): self.size = size self.padding = size if padding is None else padding self.buffer = np.zeros(self.size+self.padding) self.counter = 0 def append(self, data): """this is an O(n) operation""" data = data[-self.padding:] n = len(data) if self.remaining < n: self.compact() self.buffer[self.counter+self.size:][:n] = data self.counter += n @property def remaining(self): return self.padding-self.counter @property def view(self): """this is always an O(1) operation""" return self.buffer[self.counter:][:self.size] def compact(self): """ note: only when this function is called, is an O(size) performance hit incurred, and this cost is amortized over the whole padding space """ print 'compacting' self.buffer[:self.size] = self.view self.counter = 0 rb = RingBuffer(10) for i in range(4): rb.append([1,2,3]) print rb.view rb.append(np.arange(15)) print rb.view #test overflow 
+2
source

The @Janne Karila variant answers, for C, but not numpy:
If the ring buffer is very wide, for example, N x 1G, then instead of doubling all this, double the array of 2 * N pointers to its lines. For instance. for N = 3, initialize

 bufp = { buf[0], buf[1], buf[2], buf[0], buf[1], buf[2] }; 

Then you write data only once, and anyfunc( bufp[j:j+3] ) sees the lines in buf in order of time.

-1
source

Source: https://habr.com/ru/post/906290/


All Articles