Sort Observation by predefined order in jet extensions

Let's say I have a type T:

class T {
    public int identifier; //Arbitrary but unique for each character (Guids in real-life)
    public char character; //In real life not a char, but I chose char here for easy demo purposes
}

And I have a predefined ordered sequence of identifiers:

int[] identifierSequence = new int[]{
    9, 3, 4, 4, 7
};

Now I need to order IObservable<T>, which creates the following sequence of objects:

{identifier: 3, character 'e'},
{identifier: 9, character 'h'},
{identifier: 4, character 'l'},
{identifier: 4, character 'l'},
{identifier: 7, character 'o'}

So, the resulting IObservable produces hello. I do not want to use ToArray, because I want to receive objects as soon as they arrive, and do not wait until everything is observed. In particular, I would like to get them as follows:

 Input: e  h  l  l  o
Output:    he l  l  o

What would be the right reactive way to do this? The best I could come up with is this:

Dictionary<int, T> buffer = new Dictionary<int, T>();
int curIndex = 0;

inputObserable.SelectMany(item =>
{
    buffer[item.identifier] = item;

    IEnumerable<ReportTemplate> GetReadyElements()
    {
        while (true)
        {
            int nextItemIdentifier = identifierSequence[curIndex];
            T nextItem;
            if (buffer.TryGetValue(nextItemIdentifier, out nextItem))
            {
                buffer.Remove(nextItem.identifier);
                curIndex++;
                yield return nextItem;
            }
            else
            {
                break;
            }
        }
    }

    return GetReadyElements();
});

EDIT:

, . :

  • , , .

:

public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
    {
        var initialState = new OrderByIdentifierSequenceState<T, TId>(0, ImmutableDictionary<TId, ImmutableList<T>>.Empty, Enumerable.Empty<T>());
        return source.Scan(initialState, (oldState, item) =>
            {
                //Function to be called upon receiving new item
                //If we can pattern match the first item, then it is moved into Output, and concatted continuously with the next possible item
                //Otherwise, if nothing is available yet, just return the input state
                OrderByIdentifierSequenceState<T, TId> GetOutput(OrderByIdentifierSequenceState<T, TId> state)
                {
                    int index = state.Index;
                    ImmutableDictionary<TId, ImmutableList<T>> buffer = state.Buffer;
                    IList<T> output = new List<T>();

                    while (index < identifierSequence.Count)
                    {
                        TId key = identifierSequence[index];
                        ImmutableList<T> nextValues;
                        if (!buffer.TryGetValue(key, out nextValues) || nextValues.IsEmpty)
                        {
                            //No values available yet
                            break;
                        }

                        T toOutput = nextValues[nextValues.Count - 1];
                        output.Add(toOutput);

                        buffer = buffer.SetItem(key, nextValues.RemoveAt(nextValues.Count - 1));
                        index++;
                    }

                    return new OrderByIdentifierSequenceState<T, TId>(index, buffer, output);
                }

                //Before calling the recursive function, add the new item to the buffer
                TId itemIdentifier = identifierFunc(item);

                ImmutableList<T> valuesList;
                if (!oldState.Buffer.TryGetValue(itemIdentifier, out valuesList))
                {
                    valuesList = ImmutableList<T>.Empty;
                }
                var remodifiedBuffer = oldState.Buffer.SetItem(itemIdentifier, valuesList.Add(item));

                return GetOutput(new OrderByIdentifierSequenceState<T, TId>(oldState.Index, remodifiedBuffer, Enumerable.Empty<T>()));
            })
            // Use Dematerialize/Notifications to detect and emit end of stream.
            .SelectMany(output =>
            {
                var notifications = output.Output
                    .Select(item => Notification.CreateOnNext(item))
                    .ToList();

                if (output.Index == identifierSequence.Count)
                {
                    notifications.Add(Notification.CreateOnCompleted<T>());
                }

                return notifications;
            })
            .Dematerialize();
    }

    class OrderByIdentifierSequenceState<T, TId>
    {
        //Index shows what T we're waiting on
        public int Index { get; }
        //Buffer holds T that have arrived that we aren't ready yet for
        public ImmutableDictionary<TId, ImmutableList<T>> Buffer { get; }
        //Output holds T that can be safely emitted.
        public IEnumerable<T> Output { get; }

        public OrderByIdentifierSequenceState(int index, ImmutableDictionary<TId, ImmutableList<T>> buffer, IEnumerable<T> output)
        {
            this.Index = index;
            this.Buffer = buffer;
            this.Output = output;
        }
    }

- :

  • ( ImmutableDictionary), . : .
  • identifierSequence , . , . : -, , , , ,...
  • , identifierSequence, . , , identifierSequence, , . . : , identifierSequence, , ,...

:

    /// <summary>
    /// Takes the items from the source observable, and returns them in the order specified in identifierSequence.
    /// If an item is missing from the source observable, the returned obserable returns items up until the missing item and then blocks until the source observable is completed.
    /// All available items are then returned in order. Note that this means that while a correct order is guaranteed, there might be missing items in the result observable.
    /// If there are items in the source observable that are not in identifierSequence, these items will be ignored.
    /// </summary>
    /// <typeparam name="T">The type that is produced by the source observable</typeparam>
    /// <typeparam name="TId">The type of the identifiers used to uniquely identify a T</typeparam>
    /// <param name="source">The source observable</param>
    /// <param name="identifierSequence">A list of identifiers that defines the sequence in which the source observable is to be ordered</param>
    /// <param name="identifierFunc">A function that takes a T and outputs its unique identifier</param>
    /// <returns>An observable with the same elements as the source, but ordered by the sequence of items in identifierSequence</returns>
    public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
    {
        if (source == null)
        {
            throw new ArgumentNullException(nameof(source));
        }
        if (identifierSequence == null)
        {
            throw new ArgumentNullException(nameof(identifierSequence));
        }
        if (identifierFunc == null)
        {
            throw new ArgumentNullException(nameof(identifierFunc));
        }

        if (identifierSequence.Count == 0)
        {
            return Observable.Empty<T>();
        }

        HashSet<TId> identifiersInSequence = new HashSet<TId>(identifierSequence);

        return Observable.Create<T>(observer =>
        {
            //current index of pending item in identifierSequence
            int index = 0;
            //buffer of items we have received but are not ready for yet
            Dictionary<TId, List<T>> buffer = new Dictionary<TId, List<T>>();

            return source.Select(
                    item =>
                    {
                        //Function to be called upon receiving new item
                        //We search for the current pending item in the buffer. If it is available, we yield return it and repeat.
                        //If it is not available yet, stop.
                        IEnumerable<T> GetAvailableOutput()
                        {
                            while (index < identifierSequence.Count)
                            {
                                TId key = identifierSequence[index];
                                List<T> nextValues;
                                if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
                                {
                                    //No values available yet
                                    break;
                                }

                                yield return nextValues[nextValues.Count - 1];

                                nextValues.RemoveAt(nextValues.Count - 1);
                                index++;
                            }
                        }

                        //Get the identifier for this item
                        TId itemIdentifier = identifierFunc(item);

                        //If this item is not in identifiersInSequence, we ignore it.
                        if (!identifiersInSequence.Contains(itemIdentifier))
                        {
                            return Enumerable.Empty<T>();
                        }

                        //Add the new item to the buffer
                        List<T> valuesList;
                        if (!buffer.TryGetValue(itemIdentifier, out valuesList))
                        {
                            valuesList = new List<T>();
                            buffer[itemIdentifier] = valuesList;
                        }
                        valuesList.Add(item);

                        //Return all available items
                        return GetAvailableOutput();
                    })
                .Subscribe(output =>
                {
                    foreach (T cur in output)
                    {
                        observer.OnNext(cur);
                    }

                    if (index == identifierSequence.Count)
                    {
                        observer.OnCompleted();
                    }
                },(ex) =>
                {
                    observer.OnError(ex);
                }, () =>
                {
                    //When source observable is completed, return the remaining available items
                    while (index < identifierSequence.Count)
                    {
                        TId key = identifierSequence[index];
                        List<T> nextValues;
                        if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
                        {
                            //No values available
                            index++;
                            continue;
                        }

                        observer.OnNext(nextValues[nextValues.Count - 1]);

                        nextValues.RemoveAt(nextValues.Count - 1);
                        index++;
                    }

                    //Mark observable as completed
                    observer.OnCompleted();
                });
        });
    }
+4
3

, :

  • , , . , .
  • OnCompleted.
  • . ( GetPatternMatchOriginal - ):

-

var stateMachine = src.GetPatternMatchOriginal(new int[] { 9, 3, 4, 4, 7 });

stateMachine.Take(3).Dump(); //Linqpad
stateMachine.Take(3).Dump(); //Linqpad

: h e l l o. h e l.

, :

public static class X
{
    public static IObservable<T> GetStateMachine(this IObservable<T> source, string identifierSequence)
    {
        //State is held in an anonymous type: 
        //  Index shows what character we're waiting on, 
        //  Buffer holds characters that have arrived that we aren't ready yet for
        //  Output holds characters that can be safely emitted.
        return source
            .Scan(new { Index = 0, Buffer = ImmutableDictionary<int, ImmutableList<T>>.Empty, Output = Enumerable.Empty<T>() },
            (state, item) =>
            {
                //Function to be called recursively upon receiving new item
                //If we can pattern match the first item, then it is moved into Output, and concatted recursively with the next possible item
                //Otherwise just return the inputs
                (int Index, ImmutableDictionary<int, ImmutableList<T>> Buffer, IEnumerable<T> Output) GetOutput(int index, ImmutableDictionary<int, ImmutableList<T>> buffer, IEnumerable<T> results)
                {
                    if (index == identifierSequence.Length)
                        return (index, buffer, results);

                    var key = identifierSequence[index];
                    if (buffer.ContainsKey(key) && buffer[key].Any())
                    {
                        var toOuptut = buffer[key][buffer[key].Count - 1];
                        return GetOutput(index + 1, buffer.SetItem(key, buffer[key].RemoveAt(buffer[key].Count - 1)), results.Concat(new[] { toOuptut }));
                    }
                    else
                        return (index, buffer, results);
                }

                //Before calling the recursive function, add the new item to the buffer
                var modifiedBuffer = state.Buffer.ContainsKey(item.Identifier)
                   ? state.Buffer
                   : state.Buffer.Add(item.Identifier, ImmutableList<T>.Empty);

                var remodifiedBuffer = modifiedBuffer.SetItem(item.Identifier, modifiedBuffer[item.Identifier].Add(item));

                var output = GetOutput(state.Index, remodifiedBuffer, Enumerable.Empty<T>());
                return new { Index = output.Index, Buffer = output.Buffer, Output = output.Output };
            })
            // Use Dematerialize/Notifications to detect and emit end of stream.
            .SelectMany(output =>
            {
                var notifications = output.Output
                    .Select(item => Notification.CreateOnNext(item))
                    .ToList();
                if (output.Index == identifierSequence.Length)
                    notifications.Add(Notification.CreateOnCompleted<T>());
                return notifications;
            })
            .Dematerialize();
    }
}

:

var stateMachine = src.GetStateMachine(new int[] { 9, 3, 4, 4, 7 });
stateMachine.Dump(); //LinqPad

src.OnNext(new T { Identifier = 4, Character = 'l' });
src.OnNext(new T { Identifier = 4, Character = 'l' });
src.OnNext(new T { Identifier = 7, Character = 'o' });
src.OnNext(new T { Identifier = 3, Character = 'e' });
src.OnNext(new T { Identifier = 9, Character = 'h' });
+4

: -)

, . :

: .

public static class MyExtensions
{
    public static IObservable<TSource> MatchByKeys<TSource, TKey>(this IObservable<TSource> source, IEnumerable<TKey> keys, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> keyComparer = null)
    {
        if (source == null) throw new ArgumentNullException(nameof(source));
        if (keys == null) throw new ArgumentNullException(nameof(keys));
        if (keySelector == null) throw new ArgumentNullException(nameof(keySelector));
        if (keyComparer == null) keyComparer = EqualityComparer<TKey>.Default;

        return Observable.Create<TSource>(observer =>
        {
            var pattern = new LinkedList<SingleAssignment<TSource>>();
            var matchesByKey = new Dictionary<TKey, LinkedList<SingleAssignment<TSource>>>(keyComparer);
            foreach (var key in keys)
            {
                var match = new SingleAssignment<TSource>();
                pattern.AddLast(match);
                LinkedList<SingleAssignment<TSource>> matches;
                if (!matchesByKey.TryGetValue(key, out matches))
                {
                    matches = new LinkedList<SingleAssignment<TSource>>();
                    matchesByKey.Add(key, matches);
                }
                matches.AddLast(match);
            }

            if (pattern.First == null)
            {
                observer.OnCompleted();
                return Disposable.Empty;
            }

            var sourceSubscription = new SingleAssignmentDisposable();
            Action dispose = () =>
            {
                sourceSubscription.Dispose();
                pattern.Clear();
                matchesByKey.Clear();
            };

            sourceSubscription.Disposable = source.Subscribe(
                value =>
                {
                    try
                    {
                        var key = keySelector(value);
                        LinkedList<SingleAssignment<TSource>> matches;
                        if (!matchesByKey.TryGetValue(key, out matches)) return;
                        matches.First.Value.Value = value;
                        matches.RemoveFirst();
                        if (matches.First == null) matchesByKey.Remove(key);

                        while (pattern.First != null && pattern.First.Value.HasValue)
                        {
                            var match = pattern.First.Value;
                            pattern.RemoveFirst();
                            observer.OnNext(match.Value);
                        }
                        if (pattern.First != null) return;
                        dispose();
                        observer.OnCompleted();
                    }
                    catch (Exception ex)
                    {
                        dispose();
                        observer.OnError(ex);
                    }
                },
                error =>
                {
                    dispose();
                    observer.OnError(error);
                },
                () =>
                {
                    dispose();
                    observer.OnCompleted();
                });
            return Disposable.Create(dispose);
        });
    }

    private sealed class SingleAssignment<T>
    {
        public bool HasValue { get; private set; }

        private T _value;
        public T Value
        {
            get
            {
                if (!HasValue) throw new InvalidOperationException("No value has been set.");
                return _value;
            }
            set
            {
                if (HasValue) throw new InvalidOperationException("Value has alredy been set.");
                HasValue = true;
                _value = value;
            }
        }
    }
}

:

var src = new Subject<T>();
var ordered = src.MatchByKeys(new[] { 9, 3, 4, 4, 7 }, t => t.Identifier);
var result = new List<T>();
using (ordered.Subscribe(result.Add))
{
    src.OnNext(new T { Identifier = 3, Character = 'e' });
    src.OnNext(new T { Identifier = 9, Character = 'h' });
    src.OnNext(new T { Identifier = 4, Character = 'l' });
    src.OnNext(new T { Identifier = 4, Character = 'l' });
    src.OnNext(new T { Identifier = 7, Character = 'o' });
    src.OnCompleted();
}
Console.WriteLine(new string(result.Select(t => t.Character).ToArray()));
+1

Given that you have this:

IObservable<T> source = new []
{
    new T() { identifier = 3, character = 'e' },
    new T() { identifier = 9, character = 'h'},
    new T() { identifier = 4, character = 'l'},
    new T() { identifier = 4, character = 'l'},
    new T() { identifier = 7, character = 'o'}
}.ToObservable();

int[] identifierSequence = new int[]
{
    9, 3, 4, 4, 7
};

... then this works:

IObservable<T> query =
    source
        .Scan(new { index = 0, pendings = new List<T>(), outputs = new List<T>() }, (a, t) =>
        {
            var i = a.index;
            var o = new List<T>();
            a.pendings.Add(t);
            var r = a.pendings.Where(x => x.identifier == identifierSequence[i]).FirstOrDefault();
            while (r != null)
            {
                o.Add(r);
                a.pendings.Remove(r);
                i++;
                r = a.pendings.Where(x => x.identifier == identifierSequence[i]).FirstOrDefault();
            }
            return new { index = i, a.pendings, outputs = o };
        })
        .SelectMany(x => x.outputs);
0
source

Source: https://habr.com/ru/post/1682873/


All Articles