Observed with Futures - onNext from multiple threads

I want to generate Observable in real time from the results of the Futures list.

In the simplest case, suppose I have a list of futures that I start with Future.sequence , and I just track their progress with the Observable , which tells me every time it is completed. I basically do it like this:

  def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = { Observable[String](observer => { val loudFutures: List[Future[Int]] = futs.map(f => { f onComplete { case Success(a) => observer.onNext(s"just did $a more") case Failure(e) => observer.onError(e) } f }) Future.sequence(loudFutures) onComplete { case Success(_) => observer.onCompleted() case Failure(e) => observer.onError(e) } }) } 

This works fine in my test environment. But I just read that onNext should not be called from different threads, at least not fearing that there are no calls. What is the recommended way to fix this? It seems that many uses of Observables in the real world require that onNext called from async code like this, but I cannot find a similar example in the docs.

+5
source share
1 answer

Observed contract

Observers should periodically issue notifications to observers (not in parallel). They can issue these notifications from different streams, but there must be a formal incident - before the relationship between the notifications.

Take a look at Serialize

An observable can invoke the methods of its observers asynchronously, possibly from different threads. This can cause the Observable to violate the Observation contract because it can try to send an OnCompleted or OnError notification before one of its OnNext notifications or it can make an OnNext notification from two different streams at the same time. You can make such an Observable behave well and synchronously by applying the Serialize operator to it.

+1
source

Source: https://habr.com/ru/post/1244940/


All Articles