How to handle exceptions from asynchronous methods in a SelectMany expression

I am trying to handle some tasks asynchronously using Rx, for example.

var list = Enumerable.Range(0, 100) .ToObservable() .SelectMany(x => Observable.Start(() => { Console.WriteLine("Processing {0} ...", x); Thread.Sleep(100 * x % 3); if (x > 90) { Console.WriteLine("Procesing exception {0} > 90", x); throw new Exception("Value too large"); } Console.WriteLine("Processing {0} completed.", x); return x; })) .Subscribe( x => { Console.WriteLine("Next [{0}]", x); }, e => { Console.WriteLine("Exception:"); Console.WriteLine(e.Message); }, () => { Console.WriteLine("Complete"); } ); 

The problem with this code is that the exception is not passed to the subscriber. So, after many attempts, I gave up and decided to ask this simple question:

How do you handle exceptions that arise from asynchronous methods in a SelectMany ?

To make this clear, the final implementation is a call to a synchronous function that may or may not throw an exception. The goal is to transfer it to the subscriber so that it can be further processed (in a specific case, a message will be shown to the user).

Edit

I transferred my findings to an answer so that I could mark this question as he answered. Personally, I do not agree with the answer to myself ... but sometimes there is no other way, so I'm sorry.

+6
source share
2 answers

Answer

Actually code works correctly. However, the debugger breaks with exceptions, since async operations are still running in the background - well, at least those that were already running when the first exception occurred. Threw me up! If you run the code without a debugger, the exceptions are swallowed. So I think the problem was really in front of the computer :-)

There are still some explanations for Observable.Start , as I assumed - it’s right - that the implementation should have actually handled error handling ... see "Background".

Background

Observable.Start is a convenience method that uses the Observable.ToAsync method to turn a function / action into an async operation. If you look at the implementation of the method, you will see that it is already processing / forwarding exceptions.

 public static Func<IObservable<TResult>> ToAsync<TResult>(this Func<TResult> function, IScheduler scheduler) { if (function != null) { if (scheduler != null) { return () => { AsyncSubject<TResult> asyncSubject = new AsyncSubject<TResult>(); scheduler.Schedule(() => { TResult result = default(TResult); try { result = function(); } catch (Exception exception1) { Exception exception = exception1; asyncSubject.OnError(exception); return; } asyncSubject.OnNext(result); asyncSubject.OnCompleted(); }); return asyncSubject.AsObservable<TResult>(); }; } else { throw new ArgumentNullException("scheduler"); } } else { throw new ArgumentNullException("function"); } } 
+1
source

Use Materialize to convert OnError / OnCompleted messages to notifications.

For instance,

observable.SelectMany(x => Observable.Start(fn).Materialize())

will give you an error / termination wrapped in a notification that will be processed at your actual subscription point down the line, as opposed to an error terminated inside SelectMany.

This is useful for most Async calling operations because the method either fails or fails.

+3
source

Source: https://habr.com/ru/post/910035/


All Articles