Let's say I have a type T:
class T {
public int identifier;
public char character;
}
And I have a predefined ordered sequence of identifiers:
int[] identifierSequence = new int[]{
9, 3, 4, 4, 7
};
Now I need to order IObservable<T>, which creates the following sequence of objects:
{identifier: 3, character 'e'},
{identifier: 9, character 'h'},
{identifier: 4, character 'l'},
{identifier: 4, character 'l'},
{identifier: 7, character 'o'}
So, the resulting IObservable produces hello. I do not want to use ToArray, because I want to receive objects as soon as they arrive, and do not wait until everything is observed. In particular, I would like to get them as follows:
Input: e h l l o
Output: he l l o
What would be the right reactive way to do this? The best I could come up with is this:
Dictionary<int, T> buffer = new Dictionary<int, T>();
int curIndex = 0;
inputObserable.SelectMany(item =>
{
buffer[item.identifier] = item;
IEnumerable<ReportTemplate> GetReadyElements()
{
while (true)
{
int nextItemIdentifier = identifierSequence[curIndex];
T nextItem;
if (buffer.TryGetValue(nextItemIdentifier, out nextItem))
{
buffer.Remove(nextItem.identifier);
curIndex++;
yield return nextItem;
}
else
{
break;
}
}
}
return GetReadyElements();
});
EDIT:
, . :
:
public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
{
var initialState = new OrderByIdentifierSequenceState<T, TId>(0, ImmutableDictionary<TId, ImmutableList<T>>.Empty, Enumerable.Empty<T>());
return source.Scan(initialState, (oldState, item) =>
{
OrderByIdentifierSequenceState<T, TId> GetOutput(OrderByIdentifierSequenceState<T, TId> state)
{
int index = state.Index;
ImmutableDictionary<TId, ImmutableList<T>> buffer = state.Buffer;
IList<T> output = new List<T>();
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
ImmutableList<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.IsEmpty)
{
break;
}
T toOutput = nextValues[nextValues.Count - 1];
output.Add(toOutput);
buffer = buffer.SetItem(key, nextValues.RemoveAt(nextValues.Count - 1));
index++;
}
return new OrderByIdentifierSequenceState<T, TId>(index, buffer, output);
}
TId itemIdentifier = identifierFunc(item);
ImmutableList<T> valuesList;
if (!oldState.Buffer.TryGetValue(itemIdentifier, out valuesList))
{
valuesList = ImmutableList<T>.Empty;
}
var remodifiedBuffer = oldState.Buffer.SetItem(itemIdentifier, valuesList.Add(item));
return GetOutput(new OrderByIdentifierSequenceState<T, TId>(oldState.Index, remodifiedBuffer, Enumerable.Empty<T>()));
})
.SelectMany(output =>
{
var notifications = output.Output
.Select(item => Notification.CreateOnNext(item))
.ToList();
if (output.Index == identifierSequence.Count)
{
notifications.Add(Notification.CreateOnCompleted<T>());
}
return notifications;
})
.Dematerialize();
}
class OrderByIdentifierSequenceState<T, TId>
{
public int Index { get; }
public ImmutableDictionary<TId, ImmutableList<T>> Buffer { get; }
public IEnumerable<T> Output { get; }
public OrderByIdentifierSequenceState(int index, ImmutableDictionary<TId, ImmutableList<T>> buffer, IEnumerable<T> output)
{
this.Index = index;
this.Buffer = buffer;
this.Output = output;
}
}
- :
- (
ImmutableDictionary), . : . identifierSequence , . , . : -, , , , ,...- ,
identifierSequence, . , , identifierSequence, , . . : , identifierSequence, , ,...
:
public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (identifierSequence == null)
{
throw new ArgumentNullException(nameof(identifierSequence));
}
if (identifierFunc == null)
{
throw new ArgumentNullException(nameof(identifierFunc));
}
if (identifierSequence.Count == 0)
{
return Observable.Empty<T>();
}
HashSet<TId> identifiersInSequence = new HashSet<TId>(identifierSequence);
return Observable.Create<T>(observer =>
{
int index = 0;
Dictionary<TId, List<T>> buffer = new Dictionary<TId, List<T>>();
return source.Select(
item =>
{
IEnumerable<T> GetAvailableOutput()
{
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
List<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
{
break;
}
yield return nextValues[nextValues.Count - 1];
nextValues.RemoveAt(nextValues.Count - 1);
index++;
}
}
TId itemIdentifier = identifierFunc(item);
if (!identifiersInSequence.Contains(itemIdentifier))
{
return Enumerable.Empty<T>();
}
List<T> valuesList;
if (!buffer.TryGetValue(itemIdentifier, out valuesList))
{
valuesList = new List<T>();
buffer[itemIdentifier] = valuesList;
}
valuesList.Add(item);
return GetAvailableOutput();
})
.Subscribe(output =>
{
foreach (T cur in output)
{
observer.OnNext(cur);
}
if (index == identifierSequence.Count)
{
observer.OnCompleted();
}
},(ex) =>
{
observer.OnError(ex);
}, () =>
{
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
List<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
{
index++;
continue;
}
observer.OnNext(nextValues[nextValues.Count - 1]);
nextValues.RemoveAt(nextValues.Count - 1);
index++;
}
observer.OnCompleted();
});
});
}