I have two streams of objects, each of which has a Timestamp
value. Both threads are in order, therefore, for example, timestamps can be T a = 1,3,6,6,7
in one thread and T b = 1,2,5,5,6,8
in the other. Objects in both threads are of the same type.
What I would like to do is put each of these events on the bus in the order of the time stamp, that is, put A 1 , then B 1 , B 2 , A 3 and so on. Also, since some threads have multiple (consecutive) elements with the same timestamp, I want these elements to be grouped so that each new event is an array. Therefore, we put [A 3 ] on the bus, and then [A 1 5 , A 2 5 ], etc.
I tried to implement this by creating two ConcurrentQueue
structures, placing each event at the end of the queue, then looking at each front of the queue, first selecting an earlier event, and then bypassing the queue so that all events with this timestamp.
However, I ran into two problems:
- If I leave these queues unlimited, I will quickly run out of memory, since the read operation is much faster than the handlers that receive events. (I have several gigabytes of data).
- Sometimes I get a situation where I handle an event, say, A 1 5 before A 2 5 . Somehow I need to defend myself against this.
I think Rx can help in this regard, but I don't see the obvious combinator (s) to make this possible. Therefore, any advice is greatly appreciated.