How to fill observable Rx depending on state in event

I have an event in which I do not control that provides me with data. EventArgs looks something like this:

class MyEventArg { bool IsLastItem {get;} Data DataItem {get;} } 

I use Rx to convert this event to IObservable. But I want to complete the observable if IsLastItem is true.

Any elegant ideas? One way is to pass data through an object, that I have more options to set the OnComplete event if a condition occurs ...

+6
source share
3 answers

If you want the last element to be included, you can only combine the stream with the last element together with the regular stream in combination with TakeWhile . Here is a simple console application to prove it:

 var subject = new List<string> { "test", "last" }.ToObservable(); var my = subject .Where(x => x == "last").Take(1) .Merge(subject.TakeWhile(x => x != "last")); my.Subscribe( o => Console.WriteLine("On Next: " + o), () => Console.WriteLine("Completed")); Console.ReadLine(); 

Fingerprints:

 On Next: test On Next: last Completed 

UPDATE There was an error that suppressed the OnCompleted message if the underlying Observable did not actually complete. I fixed the code to ensure that OnCompleted receives the call

And if you want to avoid subscribing to the base sequence several times for cold observables, you can reorganize the code as follows:

 var my = subject.Publish(p => p .Where(x => x == "last").Take(1) .Merge(p.TakeWhile(x => x != "last"))); 
+9
source

Are you looking for something like this?

 IObservable<MyEventArg> result = myEventArgObservable.TakeWhile(arg => !arg.IsLastItem); 
+2
source
 public static IObservable<TSource> TakeWhileInclusive<TSource>( this IObservable<TSource> source, Func<TSource, bool> predicate) { return Observable .Create<TSource>(o => source.Subscribe(x => { o.OnNext(x); if (!predicate(x)) o.OnCompleted(); }, o.OnError, o.OnCompleted )); } 
+2
source

Source: https://habr.com/ru/post/901458/


All Articles