Avoiding collisions while collapsing an infinite blocking buffer into a circular buffer

I solve two problems for FAST arbitration. Please do not worry if you are not familiar with him, my question is actually quite general. But I am adding a description of the problem for those who are interested (you can skip it).


Data on all UDP channels is distributed in two identical channels (A and B) at two different IP multicast addresses. It is highly recommended that the client receive and process both channels due to the possible loss of a UDP packet. Processing two identical channels can statistically reduce the probability of packet loss. It is not indicated in which particular channel (A or B) appears for the first time. For arbitration of these channels, the message sequence number specified in the preamble or in the 34-MsgSeqNum tag should be used. Using the preamble allows you to determine the message sequence number without decoding the FAST message. Processing messages from channels A and B should be performed using the following algorithm:

  • Listen to channels A and B
  • Process messages according to their sequence numbers.
  • Ignore the message if one sequence number was previously processed.
  • If a space appears in the sequence number, this indicates packet loss in both channels (A and B). The client must initiate one of the recovery processes. But first of all, the client must wait for a reasonable time, perhaps the lost packet will come a little later due to reordering the packets. UDP cannot guarantee packet delivery in sequence.

    // tcp restore the algorithm further


I wrote such a very simple class. It predefines all necessary classes, and then the first thread that receives a particular seqNum can process it. Another thread will go later:

 class MsgQueue { public: MsgQueue(); ~MsgQueue(void); bool Lock(uint32_t msgSeqNum); Msg& Get(uint32_t msgSeqNum); void Commit(uint32_t msgSeqNum); private: void Process(); static const int QUEUE_LENGTH = 1000000; // 0 - available for use; 1 - processing; 2 - ready std::atomic<uint16_t> status[QUEUE_LENGTH]; Msg updates[QUEUE_LENGTH]; }; 

Implementation:

 MsgQueue::MsgQueue() { memset(status, 0, sizeof(status)); } MsgQueue::~MsgQueue(void) { } // For the same msgSeqNum should return true to only one thread bool MsgQueue::Lock(uint32_t msgSeqNum) { uint16_t expected = 0; return status[msgSeqNum].compare_exchange_strong(expected, 1); } void MsgQueue::Commit(uint32_t msgSeqNum) { status[msgSeqNum] = 2; Process(); } // this method probably should be combined with "Lock" but please ignore! :) Msg& MsgQueue::Get(uint32_t msgSeqNum) { return updates[msgSeqNum]; } void MsgQueue::Process() { // ready packets must be processed, } 

Using:

 if (!msgQueue.Lock(seq)) { return; } Msg msg = msgQueue.Get(seq); msg.Ticker = "HP" msg.Bid = 100; msg.Offer = 101; msgQueue.Commit(seq); 

This works fine assuming QUEUE_LENGTH is infinity. Because in this case one element msgSeqNum = one updates .

But I have to make the buffer round, because it is impossible to store the entire history (many millions of packets), and there is no reason for this. In fact, I need to buffer packets enough to restore the session, and as soon as the session is restored, I can delete them.

But the presence of a circular buffer greatly complicates the algorithm. For example, suppose we have a circular buffer of length 1000. And at the same time, we are trying to process seqNum = 10,000 and seqNum = 11,000 (this is very unlikely, but still possible). Both of these packages will be mapped to the updates array at index 0 and thus a collision will occur. In this case, the buffer should β€œdiscard” old packets and process new packets.

It is trivial to implement what I want to use locks , but writing lock-free code in a circular buffer that is used from different threads is really complicated. Therefore, I welcome any suggestions and tips on how to do this. Thanks!

+1
source share
1 answer

I do not believe that you can use a ring buffer. The hashed index can be used in the status[] array. That is, hash = seq % 1000 . The problem is that the serial number is dictated by the network, and you do not control its order. You want to block based on this sequence number. Your array should not be infinite, just a range of serial numbers; but it is probably more than practical.

I'm not sure what happens when the sequence number is locked. Does this mean that another thread is processing it? If so, you should maintain a sub-list for hash collisions in order to resolve a specific sequence number.

You can also consider the size of the array as power 2. For example, 1024 will allow hash = seq & 1023; which should be quite effective.

0
source

Source: https://habr.com/ru/post/943602/


All Articles