Checking for cancellation of a simple observable does not raise an onError

Given the following:

Why handler OnErrorto Subscribenever cause?

var observable = Observable.Create<string>(
    async (o, c) =>
    {
        try
        {
            var strings = new[] { "A", "B", "C" };

            foreach (var s in strings)
            {
                await Task.Delay(100);
                if (c.IsCancellationRequested)
                {
                    // exception thrown here.
                    Console.WriteLine("cancelled");
                    throw new OperationCancelledException();
                }
                o.OnNext(s);
            }
            o.OnCompleted();
        }
        catch (Exception ex)
        {
            // caught here...
            o.OnError(ex);
        }
    });

var tcs = new TaskCompletionSource<bool>();
var token = new CancellationTokenSource();
observable.Subscribe(
    str =>
    {
        Console.WriteLine(str);
        token.Cancel(); // cancel after the first iteration.
    },
    (e) =>
    {
        // why is this never called.
        Console.WriteLine($"on error :: {e.Message}");
        tcs.SetResult(true);
    },
    () =>
    {
        Console.WriteLine("on complete");
        tcs.SetResult(true);
    },
    token.Token);

// code hangs here because the subscription never completes?
await tcs.Task;
Console.WriteLine("done");
+4
source share
3 answers

The basic implementation of the observer OnError(as well as other functions) contains:

if (Interlocked.Exchange(ref this.isStopped, 1) == 0)
{
    this.OnErrorCore(error);
}

The value is isStoppedset to “stopped” when the token is canceled. The observer takes care of the cancellation process and does not need to be manually controlled.

You can easily check it if you change the code OnNextto

if(str == "B")
    token.Cancel(); // cancel after the second iteration.

The result will be:

enter image description here

Even if you delete the statement if. After you cancel the token, the functions will not inherit

var observable = Observable.Create<string>(
    async (o, c) =>
        {
            var strings = new[] { "A", "B", "C" };

            foreach (var s in strings)
            {
                await Task.Delay(100);
                o.OnNext(s);
            }
            o.OnCompleted();
        });

, ( ), - , :

if (c.IsCancellationRequested)
{
    // exception thrown here.
    Console.WriteLine("cancelled");
    tcs.SetResult(true); // instead of throwing exceptions
    // some other clean up code or/and return statement
} 
+3

Cancel , (, , , "" ) : " , OnError()".

Rx a AutoDetachObserver , .

.

OnError() . ( ) . , . OnError() .

+5

, : Rx, () (). , .

, , , . onError , Task .

token, , onError , .

+3
source

Source: https://habr.com/ru/post/1685735/


All Articles