How can I combine two streams ordered, then grouped by timestamp?

I have two streams of objects, each of which has a Timestamp value. Both threads are in order, therefore, for example, timestamps can be T a = 1,3,6,6,7 in one thread and T b = 1,2,5,5,6,8 in the other. Objects in both threads are of the same type.

What I would like to do is put each of these events on the bus in the order of the time stamp, that is, put A 1 , then B 1 , B 2 , A 3 and so on. Also, since some threads have multiple (consecutive) elements with the same timestamp, I want these elements to be grouped so that each new event is an array. Therefore, we put [A 3 ] on the bus, and then [A 1 5 , A 2 5 ], etc.

I tried to implement this by creating two ConcurrentQueue structures, placing each event at the end of the queue, then looking at each front of the queue, first selecting an earlier event, and then bypassing the queue so that all events with this timestamp.

However, I ran into two problems:

  • If I leave these queues unlimited, I will quickly run out of memory, since the read operation is much faster than the handlers that receive events. (I have several gigabytes of data).
  • Sometimes I get a situation where I handle an event, say, A 1 5 before A 2 5 . Somehow I need to defend myself against this.

I think Rx can help in this regard, but I don't see the obvious combinator (s) to make this possible. Therefore, any advice is greatly appreciated.

+6
source share
1 answer

Rx is really suitable for this IMO task.

IObservables cannot "OrderBy" for obvious reasons (first you will need to observe the entire stream to guarantee the correct output order), so my answer below makes the assumption (what you stated) that your 2 source event the threads are in order.

This was an interesting problem at the end. The standard Rx operators lack GroupByUntilChanged , which would easily solve this if it were called OnComplete for the previous group, observed when observing the first element of the next group. However, looking at the implementation of DistinctUntilChanged , it does not follow this pattern and only calls OnComplete when the original observable completes (although it knows that there will be more elements after the first implicit element ... strange ???). Anyway, for these reasons, I decided to abandon the GroupByUntilChanged method (so as not to violate the Rx conventions) and went instead for ToEnumerableUntilChanged .

Disclaimer: This is my first Rx extension, so I would appreciate feedback on the options you made. In addition, one of my main concerns is an anonymous observation holding a list of distinctElements .

Firstly, your application code is pretty simple:

  public class Event { public DateTime Timestamp { get; set; } } private IObservable<Event> eventStream1; private IObservable<Event> eventStream2; public IObservable<IEnumerable<Event>> CombineAndGroup() { return eventStream1.CombineLatest(eventStream2, (e1, e2) => e1.Timestamp < e2.Timestamp ? e1 : e2) .ToEnumerableUntilChanged(e => e.Timestamp); } 

Now to implement ToEnumerableUntilChanged (wall warning code):

  public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source, Func<TSource,TKey> keySelector) { // TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter var comparer = EqualityComparer<TKey>.Default; return Observable.Create<IEnumerable<TSource>>(observer => { var currentKey = default(TKey); var hasCurrentKey = false; var distinctElements = new List<TSource>(); return source.Subscribe((value => { TKey elementKey; try { elementKey = keySelector(value); } catch (Exception ex) { observer.OnError(ex); return; } if (!hasCurrentKey) { hasCurrentKey = true; currentKey = elementKey; distinctElements.Add(value); return; } bool keysMatch; try { keysMatch = comparer.Equals(currentKey, elementKey); } catch (Exception ex) { observer.OnError(ex); return; } if (keysMatch) { distinctElements.Add(value); return; } observer.OnNext( distinctElements); distinctElements.Clear(); distinctElements.Add(value); currentKey = elementKey; }), observer.OnError, () => { if (distinctElements.Count > 0) observer.OnNext(distinctElements); observer.OnCompleted(); }); }); } 
+10
source

Source: https://habr.com/ru/post/910939/


All Articles