Why is Rx Observable.Subscribe blocking my stream?

Hi, I tried one of 101 Rx examples:

    static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
    {
        int i = 0;

        while (true)
        {
            if (i > 1000)
            {
                yield break;
            }
            yield return i;
            Thread.Sleep(i++ % 10 < 5 ? 500 : 1000);
        }
    }

    private static void Main()
    {
        var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
        var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));

        using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }

I don’t understand why the line “Press any key to unsubscribe” is never displayed. My understanding subscribes asynchronously, you subscribe, and it immediately returns. What am I missing that blocks my main thread?

+3
source share
2 answers

A lock is caused by a combination of an enumerated loop in extension methods while (true)and IEnumerable<T>.ToObservable()by default CurrentThreadScheduler.

If you added Scheduler.TaskPool(or Scheduler.ThreadPoolin pre-.NET 4) to the overload ToObservable, you should see the behavior that you expect (although it will not call your subscriber to the main thread, FYI).

, , , Thread.Sleep Throttle , . , , .

+6

.

.ToObservable() :

public static IObservable<TSource> ToObservable<TSource>(
    this IEnumerable<TSource> source)
{
    if (source == null)
    {
        throw new ArgumentNullException("source");
    }
    return source.ToObservable<TSource>(Scheduler.CurrentThread);
}

.ToObservable(IScheduler) Scheduler.CurrentThread, .Sleep(...), , , .Subscribe(...). , , ( .)

, , , , . , " " Rx-.

, :

var observable =
    Observable
        .GenerateWithTime(0, i => i <= 1000, i => i + 1,
            i => i, i => TimeSpan.FromMilliseconds(i % 10 < 5 ? 500 : 1000))
        .Timestamp();

GenerateWithTime(...) , GenerateAlternatingFastAndSlowEvents, Scheduler.ThreadPool , - .

+2

Source: https://habr.com/ru/post/1794661/


All Articles