Subscribe and unsubscribe immediately after the first action

I want to subscribe to a subscription IObservable<T>and unsubscribe (dipos) immediately after receiving the first type element T, i.e. I only want to trigger an action on the very first element that I get after subscribing.

This is the approach I came up with:

public static class Extensions
{
    public static void SubscribeOnce<T>(this IObservable<T> observable, Action<T> action)
    {
        IDisposable subscription = null;
        subscription = observable.Subscribe(t =>
        {
            action(t);
            subscription.Dispose();
        });
    }
}

Usage example:

public class Program
{
    public static void Main()
    {
        var subject = new Subject<int>();

        subject.OnNext(0);
        subject.OnNext(1);
        subject.SubscribeOnce(i => Console.WriteLine(i));
        subject.OnNext(2);
        subject.OnNext(3);

        Console.ReadLine();
    }
}

Works as expected and prints only 2. Is there something wrong with this or something else to consider? Is there a cleaner way to use the extension tags that come with RX out of the box, maybe?

+4
source share
1 answer
var source = new Subject();

source
  .Take(1)
  .Subscribe(Console.WriteLine);

source.OnNext(5);
source.OnNext(6);
source.OnError(new Exception("oops!"));
source.OnNext(7);
source.OnNext(8);

// Output is "5". Subscription is auto-disposed. Error is ignored.

Take nth.:)

, , , OnError OnCompleted , .

, Dispose.

+13

Source: https://habr.com/ru/post/1534446/


All Articles