I solve two problems for FAST arbitration. Please do not worry if you are not familiar with him, my question is actually quite general. But I am adding a description of the problem for those who are interested (you can skip it).
Data on all UDP channels is distributed in two identical channels (A and B) at two different IP multicast addresses. It is highly recommended that the client receive and process both channels due to the possible loss of a UDP packet. Processing two identical channels can statistically reduce the probability of packet loss. It is not indicated in which particular channel (A or B) appears for the first time. For arbitration of these channels, the message sequence number specified in the preamble or in the 34-MsgSeqNum tag should be used. Using the preamble allows you to determine the message sequence number without decoding the FAST message. Processing messages from channels A and B should be performed using the following algorithm:
- Listen to channels A and B
- Process messages according to their sequence numbers.
- Ignore the message if one sequence number was previously processed.
If a space appears in the sequence number, this indicates packet loss in both channels (A and B). The client must initiate one of the recovery processes. But first of all, the client must wait for a reasonable time, perhaps the lost packet will come a little later due to reordering the packets. UDP cannot guarantee packet delivery in sequence.
// tcp restore the algorithm further
I wrote such a very simple class. It predefines all necessary classes, and then the first thread that receives a particular seqNum can process it. Another thread will go later:
class MsgQueue { public: MsgQueue(); ~MsgQueue(void); bool Lock(uint32_t msgSeqNum); Msg& Get(uint32_t msgSeqNum); void Commit(uint32_t msgSeqNum); private: void Process(); static const int QUEUE_LENGTH = 1000000;
Implementation:
MsgQueue::MsgQueue() { memset(status, 0, sizeof(status)); } MsgQueue::~MsgQueue(void) { }
Using:
if (!msgQueue.Lock(seq)) { return; } Msg msg = msgQueue.Get(seq); msg.Ticker = "HP" msg.Bid = 100; msg.Offer = 101; msgQueue.Commit(seq);
This works fine assuming QUEUE_LENGTH is infinity. Because in this case one element msgSeqNum = one updates .
But I have to make the buffer round, because it is impossible to store the entire history (many millions of packets), and there is no reason for this. In fact, I need to buffer packets enough to restore the session, and as soon as the session is restored, I can delete them.
But the presence of a circular buffer greatly complicates the algorithm. For example, suppose we have a circular buffer of length 1000. And at the same time, we are trying to process seqNum = 10,000 and seqNum = 11,000 (this is very unlikely, but still possible). Both of these packages will be mapped to the updates array at index 0 and thus a collision will occur. In this case, the buffer should βdiscardβ old packets and process new packets.
It is trivial to implement what I want to use locks , but writing lock-free code in a circular buffer that is used from different threads is really complicated. Therefore, I welcome any suggestions and tips on how to do this. Thanks!